Disruptor—3.核心源码实现分析
大纲
1.Disruptor的生产者源码分析
2.Disruptor的消费者源码分析
3.Disruptor的WaitStrategy等待策略分析
4.Disruptor的高性能原因
5.Disruptor高性能之数据结构(内存预加载机制)
6.Disruptor高性能之内核(使用单线程写)
7.Disruptor高性能之系统内存优化(内存屏障)
8.Disruptor高性能之系统缓存优化(消除伪共享)
9.Disruptor高性能之序号获取优化(自旋 + CAS)
1.Disruptor的生产者源码分析
(1)通过Sequence序号发布消息
(2)通过Translator事件转换器发布消息
(1)通过Sequence序号发布消息
生产者可以先从RingBuffer中获取一个可用的Sequence序号,然后再根据该Sequence序号从RingBuffer的环形数组中获取对应的元素,接着对该元素进行赋值替换,最后调用RingBuffer的publish()方法设置当前生产者的Sequence序号来完成事件消息的发布。
//注意:这里使用的版本是3.4.4 //单生产者单消费者的使用示例 public class Main { public static void main(String[] args) { //参数准备 OrderEventFactory orderEventFactory = new OrderEventFactory(); int ringBufferSize = 4; ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); //参数一:eventFactory,消息(Event)工厂对象 //参数二:ringBufferSize,容器的长度 //参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler //参数四:ProducerType,单生产者还是多生产者 //参数五:waitStrategy,等待策略 //1.实例化Disruptor对象 Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>( orderEventFactory, ringBufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy() ); //2.添加Event处理器,用于处理事件 //也就是构建Disruptor与消费者的一个关联关系 disruptor.handleEventsWith(new OrderEventHandler()); //3.启动Disruptor disruptor.start(); //4.获取实际存储数据的容器: RingBuffer RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer(); OrderEventProducer producer = new OrderEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long i = 0; i < 5; i++) { bb.putLong(0, i); //向容器中投递数据 producer.sendData(bb); } disruptor.shutdown(); executor.shutdown(); } } public class OrderEventProducer { private RingBuffer<OrderEvent> ringBuffer; public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void sendData(ByteBuffer data) { //1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号 long sequence = ringBuffer.next(); try { //2.根据这个序号, 找到具体的"OrderEvent"元素 //注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象" OrderEvent event = ringBuffer.get(sequence); //3.进行实际的赋值处理 event.setValue(data.getLong(0)); } finally { //4.提交发布操作 ringBuffer.publish(sequence); } } } public class OrderEventHandler implements EventHandler<OrderEvent> { public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception { Thread.sleep(1000); System.err.println("消费者: " + event.getValue()); } }
//多生产者多消费者的使用示例 public class Main { public static void main(String[] args) throws InterruptedException { //1.创建RingBuffer RingBuffer<Order> ringBuffer = RingBuffer.create( ProducerType.MULTI,//多生产者 new EventFactory<Order>() { public Order newInstance() { return new Order(); } }, 1024 * 1024, new YieldingWaitStrategy() ); //2.通过ringBuffer创建一个屏障 SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); //3.创建消费者数组,每个消费者Consumer都需要实现WorkHandler接口 Consumer[] consumers = new Consumer[10]; for (int i = 0; i < consumers.length; i++) { consumers[i] = new Consumer("C" + i); } //4.构建多消费者工作池WorkerPool,因为多消费者模式下需要使用WorkerPool WorkerPool<Order> workerPool = new WorkerPool<Order>( ringBuffer, sequenceBarrier, new EventExceptionHandler(), consumers ); //5.设置多个消费者的sequence序号,用于单独统计每个消费者的消费进度, 并且设置到RingBuffer中 ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); //6.启动workerPool workerPool.start(Executors.newFixedThreadPool(5)); final CountDownLatch latch = new CountDownLatch(1); for (int i = 0; i < 100; i++) { final Producer producer = new Producer(ringBuffer); new Thread(new Runnable() { public void run() { try { latch.await(); } catch (Exception e) { e.printStackTrace(); } for (int j = 0; j < 100; j++) { producer.sendData(UUID.randomUUID().toString()); } } }).start(); } Thread.sleep(2000); System.err.println("----------线程创建完毕,开始生产数据----------"); latch.countDown(); Thread.sleep(10000); System.err.println("任务总数:" + consumers[2].getCount()); } } public class Producer { private RingBuffer<Order> ringBuffer; public Producer(RingBuffer<Order> ringBuffer) { this.ringBuffer = ringBuffer; } public void sendData(String uuid) { //1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号 long sequence = ringBuffer.next(); try { //2.根据这个序号, 找到具体的"Order"元素 //注意:此时获取的Order对象是一个没有被赋值的"空对象" Order order = ringBuffer.get(sequence); //3.进行实际的赋值处理 order.setId(uuid); } finally { //4.提交发布操作 ringBuffer.publish(sequence); } } } public class Consumer implements WorkHandler<Order> { private static AtomicInteger count = new AtomicInteger(0); private String consumerId; private Random random = new Random(); public Consumer(String consumerId) { this.consumerId = consumerId; } public void onEvent(Order event) throws Exception { Thread.sleep(1 * random.nextInt(5)); System.err.println("当前消费者: " + this.consumerId + ", 消费信息ID: " + event.getId()); count.incrementAndGet(); } public int getCount() { return count.get(); } }
其中,RingBuffer的publish(sequence)方法会调用Sequencer接口的publish()方法来设置当前生产者的Sequence序号。
abstract class RingBufferPad { protected long p1, p2, p3, p4, p5, p6, p7; } abstract class RingBufferFields<E> extends RingBufferPad { ... private static final Unsafe UNSAFE = Util.getUnsafe(); private final long indexMask; //环形数组存储事件消息 private final Object[] entries; protected final int bufferSize; //RingBuffer的sequencer属性代表了当前线程对应的生产者 protected final Sequencer sequencer; RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; //初始化数组 this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; //内存预加载 fill(eventFactory); } private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } } protected final E elementAt(long sequence) { return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); } ... } public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> { protected long p1, p2, p3, p4, p5, p6, p7; ... //Increment and return the next sequence for the ring buffer. //Calls of this method should ensure that they always publish the sequence afterward. //E.g. //long sequence = ringBuffer.next(); //try { // Event e = ringBuffer.get(sequence); // //Do some work with the event. //} finally { // ringBuffer.publish(sequence); //} //@return The next sequence to publish to. //@see RingBuffer#publish(long) //@see RingBuffer#get(long) @Override public long next() { return sequencer.next(); } //Publish the specified sequence. //This action marks this particular message as being available to be read. //@param sequence the sequence to publish. @Override public void publish(long sequence) { sequencer.publish(sequence); } //Get the event for a given sequence in the RingBuffer. //This call has 2 uses. //Firstly use this call when publishing to a ring buffer. //After calling RingBuffer#next() use this call to get hold of the preallocated event to fill with data before calling RingBuffer#publish(long). //Secondly use this call when consuming data from the ring buffer. //After calling SequenceBarrier#waitFor(long) call this method with any value greater than that //your current consumer sequence and less than or equal to the value returned from the SequenceBarrier#waitFor(long) method. //@param sequence for the event //@return the event for the given sequence @Override public E get(long sequence) { //调用父类RingBufferFields的elementAt()方法 return elementAt(sequence); } ... }
RingBuffer的sequencer属性会在创建RingBuffer对象时传入,而创建RingBuffer对象的时机则是在初始化Disruptor的时候。
在Disruptor的构造方法中,会调用RingBuffer的create()方法,RingBuffer的create()方法会根据不同的生产者类型来初始化sequencer属性。
由生产者线程通过new创建的Sequencer接口实现类的实例就是一个生产者。单生产者的线程执行上面的main()方法时,会创建一个单生产者Sequencer实例来代表生产者。多生产者的线程执行如下的main()方法时,会创建一个多生产者Sequencer实例来代表生产者。
public class Disruptor<T> { private final RingBuffer<T> ringBuffer; private final Executor executor; private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>(); private final AtomicBoolean started = new AtomicBoolean(false); private ExceptionHandler<? super T> exceptionHandler; ... //Create a new Disruptor. //@param eventFactory the factory to create events in the ring buffer. //@param ringBufferSize the size of the ring buffer, must be power of 2. //@param executor an Executor to execute event processors. //@param producerType the claim strategy to use for the ring buffer. //@param waitStrategy the wait strategy to use for the ring buffer. public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor, final ProducerType producerType, final WaitStrategy waitStrategy) { this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor); } private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor) { this.ringBuffer = ringBuffer; this.executor = executor; } ... } public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> { protected long p1, p2, p3, p4, p5, p6, p7; ... //Create a new Ring Buffer with the specified producer type (SINGLE or MULTI) //@param producerType producer type to use ProducerType. //@param factory used to create events within the ring buffer. //@param bufferSize number of elements to create within the ring buffer. //@param waitStrategy used to determine how to wait for new elements to become available. public static <E> RingBuffer<E> create(ProducerType producerType, EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { switch (producerType) { case SINGLE: //单生产者模式下的当前生产者是一个SingleProducerSequencer实例 return createSingleProducer(factory, bufferSize, waitStrategy); case MULTI: //多生产者模式下的当前生产者是一个MultiProducerSequencer实例 return createMultiProducer(factory, bufferSize, waitStrategy); default: throw new IllegalStateException(producerType.toString()); } } //Create a new single producer RingBuffer with the specified wait strategy. //@param <E> Class of the event stored in the ring buffer. //@param factory used to create the events within the ring buffer. //@param bufferSize number of elements to create within the ring buffer. //@param waitStrategy used to determine how to wait for new elements to become available. //@return a constructed ring buffer. public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy); return new RingBuffer<E>(factory, sequencer); } //Create a new multiple producer RingBuffer with the specified wait strategy. //@param <E> Class of the event stored in the ring buffer. //@param factory used to create the events within the ring buffer. //@param bufferSize number of elements to create within the ring buffer. //@param waitStrategy used to determine how to wait for new elements to become available. //@return a constructed ring buffer. public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy); return new RingBuffer<E>(factory, sequencer); } //Construct a RingBuffer with the full option set. //@param eventFactory to newInstance entries for filling the RingBuffer //@param sequencer sequencer to handle the ordering of events moving through the RingBuffer. RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) { super(eventFactory, sequencer); } ... } abstract class RingBufferPad { protected long p1, p2, p3, p4, p5, p6, p7; } abstract class RingBufferFields<E> extends RingBufferPad { ... private final long indexMask; //环形数组存储事件消息 private final Object[] entries; protected final int bufferSize; //RingBuffer的sequencer属性代表了当前线程对应的生产者 protected final Sequencer sequencer; RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; //初始化数组 this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; //内存预加载 fill(eventFactory); } private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } } ... }
SingleProducerSequencer的publish()方法在发布事件消息时,首先会设置当前生产者的Sequence,然后会通过等待策略通知阻塞的消费者。
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> { ... //Publish the specified sequence. //This action marks this particular message as being available to be read. //@param sequence the sequence to publish. @Override public void publish(long sequence) { sequencer.publish(sequence); } ... } public abstract class AbstractSequencer implements Sequencer { private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences"); //环形数组的大小 protected final int bufferSize; //等待策略 protected final WaitStrategy waitStrategy; //当前生产者的进度 protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); //每一个Sequence都对应着一个消费者(一个EventHandler或者一个WorkHandler) //这些Sequence会通过SEQUENCE_UPDATER在执行Disruptor的handleEventsWith()等方法时, //由RingBuffer的addGatingSequences()方法进行添加 protected volatile Sequence[] gatingSequences = new Sequence[0]; ... //Create with the specified buffer size and wait strategy. //@param bufferSize The total number of entries, must be a positive power of 2. //@param waitStrategy public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) { if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.bufferSize = bufferSize; this.waitStrategy = waitStrategy; } ... } abstract class SingleProducerSequencerPad extends AbstractSequencer { protected long p1, p2, p3, p4, p5, p6, p7; public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); } } abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad { public SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); } //表示生产者的当前序号,值为-1 protected long nextValue = Sequence.INITIAL_VALUE; //表示消费者的最小序号,值为-1 protected long cachedValue = Sequence.INITIAL_VALUE; } public final class SingleProducerSequencer extends SingleProducerSequencerFields { protected long p1, p2, p3, p4, p5, p6, p7; //Construct a Sequencer with the selected wait strategy and buffer size. //@param bufferSize the size of the buffer that this will sequence over. //@param waitStrategy for those waiting on sequences. public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); } @Override public void publish(long sequence) { //设置当前生产者的进度,cursor代表了当前生产者的Sequence cursor.set(sequence); //通过等待策略通知阻塞的消费者 waitStrategy.signalAllWhenBlocking(); } @Override public long next() { return next(1); } @Override public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long nextValue = this.nextValue; long nextSequence = nextValue + n; long wrapPoint = nextSequence - bufferSize; long cachedGatingSequence = this.cachedValue; if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { long minSequence; while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { LockSupport.parkNanos(1L); } this.cachedValue = minSequence; } this.nextValue = nextSequence; return nextSequence; } ... } class LhsPadding { protected long p1, p2, p3, p4, p5, p6, p7; } class Value extends LhsPadding { protected volatile long value; } class RhsPadding extends Value { protected long p9, p10, p11, p12, p13, p14, p15; } //Concurrent sequence class used for tracking the progress of the ring buffer and event processors. //Support a number of concurrent operations including CAS and order writes. //Also attempts to be more efficient with regards to false sharing by adding padding around the volatile field. public class Sequence extends RhsPadding { static final long INITIAL_VALUE = -1L; private static final Unsafe UNSAFE; private static final long VALUE_OFFSET; static { UNSAFE = Util.getUnsafe(); VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value")); } //Create a sequence initialised to -1. public Sequence() { this(INITIAL_VALUE); } //Create a sequence with a specified initial value. //@param initialValue The initial value for this sequence. public Sequence(final long initialValue) { UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue); } //Perform a volatile read of this sequence's value. //@return The current value of the sequence. public long get() { return value; } //Perform an ordered write of this sequence. //The intent is a Store/Store barrier between this write and any previous store. //@param value The new value for the sequence. public void set(final long value) { UNSAFE.putOrderedLong(this, VALUE_OFFSET, value); } //Performs a volatile write of this sequence. //The intent is a Store/Store barrier between this write and //any previous write and a Store/Load barrier between this write and //any subsequent volatile read. //@param value The new value for the sequence. public void setVolatile(final long value) { UNSAFE.putLongVolatile(this, VALUE_OFFSET, value); } //Perform a compare and set operation on the sequence. //@param expectedValue The expected current value. //@param newValue The value to update to. //@return true if the operation succeeds, false otherwise. public boolean compareAndSet(final long expectedValue, final long newValue) { return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue); } //Atomically increment the sequence by one. //@return The value after the increment public long incrementAndGet() { return addAndGet(1L); } //Atomically add the supplied value. //@param increment The value to add to the sequence. //@return The value after the increment. public long addAndGet(final long increment) { long currentValue; long newValue; do { currentValue = get(); newValue = currentValue + increment; } while (!compareAndSet(currentValue, newValue)); return newValue; } @Override public String toString() { return Long.toString(get()); } }
MultiProducerSequencer的publish()方法在发布事件消息时,则会通过UnSafe设置sequence在int数组中对应元素的值。
public final class MultiProducerSequencer extends AbstractSequencer { private static final Unsafe UNSAFE = Util.getUnsafe(); private static final long BASE = UNSAFE.arrayBaseOffset(int[].class); private static final long SCALE = UNSAFE.arrayIndexScale(int[].class); private final int[] availableBuffer; private final int indexMask; private final int indexShift; //Construct a Sequencer with the selected wait strategy and buffer size. //@param bufferSize the size of the buffer that this will sequence over. //@param waitStrategy for those waiting on sequences. public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); availableBuffer = new int[bufferSize]; indexMask = bufferSize - 1; indexShift = Util.log2(bufferSize); initialiseAvailableBuffer(); } private void initialiseAvailableBuffer() { for (int i = availableBuffer.length - 1; i != 0; i--) { setAvailableBufferValue(i, -1); } setAvailableBufferValue(0, -1); } private void setAvailableBufferValue(int index, int flag) { long bufferAddress = (index * SCALE) + BASE; UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag); } @Override public void publish(final long sequence) { setAvailable(sequence); waitStrategy.signalAllWhenBlocking(); } //The below methods work on the availableBuffer flag. //The prime reason is to avoid a shared sequence object between publisher threads. //(Keeping single pointers tracking start and end would require coordination between the threads). //-- Firstly we have the constraint that the delta between the cursor and minimum gating sequence //will never be larger than the buffer size (the code in next/tryNext in the Sequence takes care of that). //-- Given that; take the sequence value and mask off the lower portion of the sequence //as the index into the buffer (indexMask). (aka modulo operator) //-- The upper portion of the sequence becomes the value to check for availability. //ie: it tells us how many times around the ring buffer we've been (aka division) //-- Because we can't wrap without the gating sequences moving forward //(i.e. the minimum gating sequence is effectively our last available position in the buffer), //when we have new data and successfully claimed a slot we can simply write over the top. private void setAvailable(final long sequence) { setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence)); } private int calculateIndex(final long sequence) { return ((int) sequence) & indexMask; } private int calculateAvailabilityFlag(final long sequence) { return (int) (sequence >>> indexShift); } @Override public long next() { return next(1); } @Override public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long current; long next; do { current = cursor.get(); next = current + n; long wrapPoint = next - bufferSize; long cachedGatingSequence = gatingSequenceCache.get(); if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { long gatingSequence = Util.getMinimumSequence(gatingSequences, current); if (wrapPoint > gatingSequence) { LockSupport.parkNanos(1); continue; } gatingSequenceCache.set(gatingSequence); } else if (cursor.compareAndSet(current, next)) { break; } } while (true); return next; } ... }
(2)通过Translator事件转换器发布消息
生产者还可以直接调用RingBuffer的tryPublishEvent()方法来完成发布事件消息到RingBuffer。该方法首先会调用Sequencer接口的tryNext()方法获取sequence序号,然后根据该sequence序号从RingBuffer的环形数组中获取对应的元素,接着再调用RingBuffer的translateAndPublish()方法将事件消息赋值替换到该元素中,最后调用Sequencer接口的publish()方法设置当前生产者的sequence序号来完成事件消息的发布。
abstract class RingBufferPad { protected long p1, p2, p3, p4, p5, p6, p7; } abstract class RingBufferFields<E> extends RingBufferPad { ... private static final Unsafe UNSAFE = Util.getUnsafe(); private final long indexMask; //环形数组存储事件消息 private final Object[] entries; protected final int bufferSize; //RingBuffer的sequencer属性代表了当前线程对应的生产者 protected final Sequencer sequencer; RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; //初始化数组 this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; //内存预加载 fill(eventFactory); } private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } } protected final E elementAt(long sequence) { return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); } ... } public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> { //值为-1 public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE; protected long p1, p2, p3, p4, p5, p6, p7; //Construct a RingBuffer with the full option set. //@param eventFactory to newInstance entries for filling the RingBuffer //@param sequencer sequencer to handle the ordering of events moving through the RingBuffer. RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) { super(eventFactory, sequencer); } @Override public boolean tryPublishEvent(EventTranslator<E> translator) { try { final long sequence = sequencer.tryNext(); translateAndPublish(translator, sequence); return true; } catch (InsufficientCapacityException e) { return false; } } private void translateAndPublish(EventTranslator<E> translator, long sequence) { try { translator.translateTo(get(sequence), sequence); } finally { sequencer.publish(sequence); } } //Get the event for a given sequence in the RingBuffer. //This call has 2 uses. //Firstly use this call when publishing to a ring buffer. //After calling RingBuffer#next() use this call to get hold of the preallocated event to fill with data before calling RingBuffer#publish(long). //Secondly use this call when consuming data from the ring buffer. //After calling SequenceBarrier#waitFor(long) call this method with any value greater than that //your current consumer sequence and less than or equal to the value returned from the SequenceBarrier#waitFor(long) method. //@param sequence for the event //@return the event for the given sequence @Override public E get(long sequence) { //调用父类RingBufferFields的elementAt()方法 return elementAt(sequence); } ... } public abstract class AbstractSequencer implements Sequencer { private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences"); //环形数组的大小 protected final int bufferSize; //等待策略 protected final WaitStrategy waitStrategy; //当前生产者的进度 protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); //每一个Sequence都对应着一个消费者(一个EventHandler或者一个WorkHandler) //这些Sequence会通过SEQUENCE_UPDATER在执行Disruptor的handleEventsWith()等方法时, //由RingBuffer的addGatingSequences()方法进行添加 protected volatile Sequence[] gatingSequences = new Sequence[0]; ... //Create with the specified buffer size and wait strategy. //@param bufferSize The total number of entries, must be a positive power of 2. //@param waitStrategy public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) { if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.bufferSize = bufferSize; this.waitStrategy = waitStrategy; } ... } public final class SingleProducerSequencer extends SingleProducerSequencerFields { protected long p1, p2, p3, p4, p5, p6, p7; //Construct a Sequencer with the selected wait strategy and buffer size. //@param bufferSize the size of the buffer that this will sequence over. //@param waitStrategy for those waiting on sequences. public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); } ... @Override public long tryNext() throws InsufficientCapacityException { return tryNext(1); } @Override public long tryNext(int n) throws InsufficientCapacityException { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } if (!hasAvailableCapacity(n, true)) { throw InsufficientCapacityException.INSTANCE; } long nextSequence = this.nextValue += n; return nextSequence; } private boolean hasAvailableCapacity(int requiredCapacity, boolean doStore) { long nextValue = this.nextValue; long wrapPoint = (nextValue + requiredCapacity) - bufferSize; long cachedGatingSequence = this.cachedValue; if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { if (doStore) { cursor.setVolatile(nextValue);//StoreLoad fence } long minSequence = Util.getMinimumSequence(gatingSequences, nextValue); this.cachedValue = minSequence; if (wrapPoint > minSequence) { return false; } } return true; } @Override public void publish(long sequence) { //设置当前生产者的sequence cursor.set(sequence); //通过等待策略通知阻塞的消费者 waitStrategy.signalAllWhenBlocking(); } ... } abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad { SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); } //表示生产者的当前序号,值为-1 protected long nextValue = Sequence.INITIAL_VALUE; //表示消费者的最小序号,值为-1 protected long cachedValue = Sequence.INITIAL_VALUE; } abstract class SingleProducerSequencerPad extends AbstractSequencer { protected long p1, p2, p3, p4, p5, p6, p7; SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); } } public final class MultiProducerSequencer extends AbstractSequencer { ... @Override public long tryNext() throws InsufficientCapacityException { return tryNext(1); } @Override public long tryNext(int n) throws InsufficientCapacityException { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long current; long next; do { current = cursor.get(); next = current + n; if (!hasAvailableCapacity(gatingSequences, n, current)) { throw InsufficientCapacityException.INSTANCE; } } while (!cursor.compareAndSet(current, next)); return next; } private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue) { long wrapPoint = (cursorValue + requiredCapacity) - bufferSize; long cachedGatingSequence = gatingSequenceCache.get(); if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue) { long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue); gatingSequenceCache.set(minSequence); if (wrapPoint > minSequence) { return false; } } return true; } @Override public void publish(final long sequence) { setAvailable(sequence); waitStrategy.signalAllWhenBlocking(); } ... } //Implementations translate (write) data representations into events claimed from the RingBuffer. //When publishing to the RingBuffer, provide an EventTranslator. //The RingBuffer will select the next available event by sequence and provide it to the EventTranslator (which should update the event), //before publishing the sequence update. //@param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event. public interface EventTranslator<T> { //Translate a data representation into fields set in given event //@param event into which the data should be translated. //@param sequence that is assigned to event. void translateTo(T event, long sequence); }
2.Disruptor的消费者源码分析
Disruptor的消费者主要由BatchEventProcessor类和WorkProcessor类来实现,并通过Disruptor的handleEventsWith()方法或者handleEventsWithWorkerPool()方法和start()方法来启动。
执行Disruptor的handleEventsWith()方法绑定消费者时,会创建BatchEventProcessor对象,并将其添加到Disruptor的consumerRepository属性。
执行Disruptor的handleEventsWithWorkerPool()方法绑定消费者时,则会创建WorkProcessor对象,并将该对象添加到Disruptor的consumerRepository属性。
执行Disruptor的start()方法启动Disruptor实例时,便会通过线程池执行BatchEventProcessor里的run()方法,或者通过线程池执行WorkProcessor里的run()方法。
执行BatchEventProcessor的run()方法时,会通过修改BatchEventProcessor的sequence来实现消费RingBuffer的数据。
执行WorkProcessor的run()方法时,会通过修改WorkProcessor的sequence来实现消费RingBuffer的数据。
public class Main { public static void main(String[] args) { //参数准备 OrderEventFactory orderEventFactory = new OrderEventFactory(); int ringBufferSize = 4; ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); //参数一:eventFactory,消息(Event)工厂对象 //参数二:ringBufferSize,容器的长度 //参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler //参数四:ProducerType,单生产者还是多生产者 //参数五:waitStrategy,等待策略 //1.实例化Disruptor对象 Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>( orderEventFactory, ringBufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy() ); //2.添加Event处理器,用于处理事件 //也就是构建Disruptor与消费者的一个关联关系 //方式一:使用handleEventsWith()方法 disruptor.handleEventsWith(new OrderEventHandler()); //方式二:使用handleEventsWithWorkerPool()方法 //disruptor.handleEventsWithWorkerPool(workHandlers); //3.启动disruptor disruptor.start(); //4.获取实际存储数据的容器: RingBuffer RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer(); OrderEventProducer producer = new OrderEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long i = 0; i < 5; i++) { bb.putLong(0, i); //向容器中投递数据 producer.sendData(bb); } disruptor.shutdown(); executor.shutdown(); } }
public class Disruptor<T> { private final RingBuffer<T> ringBuffer; private final Executor executor; private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>(); private final AtomicBoolean started = new AtomicBoolean(false); private ExceptionHandler<? super T> exceptionHandler; ... //绑定消费者,设置EventHandler,创建EventProcessor //Set up event handlers to handle events from the ring buffer. //These handlers will process events as soon as they become available, in parallel. //This method can be used as the start of a chain. //For example if the handler A must process events before handler B: dw.handleEventsWith(A).then(B); //@param handlers the event handlers that will process events. //@return a EventHandlerGroup that can be used to chain dependencies. @SuppressWarnings("varargs") public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) { return createEventProcessors(new Sequence[0], handlers); } //创建BatchEventProcessor,添加到consumerRepository中 EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) { checkNotStarted(); final Sequence[] processorSequences = new Sequence[eventHandlers.length]; final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { final EventHandler<? super T> eventHandler = eventHandlers[i]; //创建BatchEventProcessor对象 final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler); if (exceptionHandler != null) { batchEventProcessor.setExceptionHandler(exceptionHandler); } //添加BatchEventProcessor对象到consumerRepository中 consumerRepository.add(batchEventProcessor, eventHandler, barrier); //一个消费者线程对应一个batchEventProcessor //每个batchEventProcessor都会持有一个Sequence对象来表示当前消费者线程的消费进度 processorSequences[i] = batchEventProcessor.getSequence(); } //将每个消费者线程持有的Sequence对象添加到生产者Sequencer的gatingSequences属性中(Sequence[]属性) updateGatingSequencesForNextInChain(barrierSequences, processorSequences); return new EventHandlerGroup<>(this, consumerRepository, processorSequences); } private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) { if (processorSequences.length > 0) { ringBuffer.addGatingSequences(processorSequences); for (final Sequence barrierSequence : barrierSequences) { ringBuffer.removeGatingSequence(barrierSequence); } consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); } } private void checkNotStarted() { //线程的开关会使用CAS实现 if (started.get()) { throw new IllegalStateException("All event handlers must be added before calling starts."); } } ... //Starts the event processors and returns the fully configured ring buffer. //The ring buffer is set up to prevent overwriting any entry that is yet to be processed by the slowest event processor. //This method must only be called once after all event processors have been added. //@return the configured ring buffer. public RingBuffer<T> start() { checkOnlyStartedOnce(); for (final ConsumerInfo consumerInfo : consumerRepository) { //在执行Disruptor.handleEventsWith()方法,调用Disruptor.createEventProcessors()方法时, //会将新创建的BatchEventProcessor对象封装成EventProcessorInfo对象(即ConsumerInfo对象), //然后通过add()方法添加到consumerRepository中 //所以下面会调用EventProcessorInfo.start()方法 consumerInfo.start(executor); } return ringBuffer; } private void checkOnlyStartedOnce() { //线程的开关使用CAS实现 if (!started.compareAndSet(false, true)) { throw new IllegalStateException("Disruptor.start() must only be called once."); } } ... } //Provides a repository mechanism to associate EventHandlers with EventProcessors class ConsumerRepository<T> implements Iterable<ConsumerInfo> { private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler = new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>(); private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence = new IdentityHashMap<Sequence, ConsumerInfo>(); private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>(); //添加BatchEventProcessor对象到consumerRepository中 public void add(final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier) { //将传入的BatchEventProcessor对象封装成EventProcessorInfo对象,即ConsumerInfo对象 final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(eventprocessor, handler, barrier); eventProcessorInfoByEventHandler.put(handler, consumerInfo); eventProcessorInfoBySequence.put(eventprocessor.getSequence(), consumerInfo); consumerInfos.add(consumerInfo); } ... } class EventProcessorInfo<T> implements ConsumerInfo { private final EventProcessor eventprocessor; private final EventHandler<? super T> handler; private final SequenceBarrier barrier; private boolean endOfChain = true; EventProcessorInfo(final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier) { this.eventprocessor = eventprocessor; this.handler = handler; this.barrier = barrier; } ... @Override public void start(final Executor executor) { //通过传入的线程池,执行BatchEventProcessor对象的run()方法 //传入的线程池,其实就是初始化Disruptor时指定的线程池 executor.execute(eventprocessor); } ... } //Convenience class for handling the batching semantics of consuming entries from //a RingBuffer and delegating the available events to an EventHandler. //If the EventHandler also implements LifecycleAware it will be notified just after //the thread is started and just before the thread is shutdown. //@param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event. public final class BatchEventProcessor<T> implements EventProcessor { private final AtomicBoolean running = new AtomicBoolean(false); private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler(); private final DataProvider<T> dataProvider; private final SequenceBarrier sequenceBarrier; private final EventHandler<? super T> eventHandler; private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); private final TimeoutHandler timeoutHandler; //Construct a EventProcessor that will automatically track the progress by //updating its sequence when the EventHandler#onEvent(Object, long, boolean) method returns. //@param dataProvider to which events are published. //@param sequenceBarrier on which it is waiting. //@param eventHandler is the delegate to which events are dispatched. public BatchEventProcessor(final DataProvider<T> dataProvider, final SequenceBarrier sequenceBarrier, final EventHandler<? super T> eventHandler) { //传入的dataProvider其实就是Disruptor的ringBuffer this.dataProvider = dataProvider; this.sequenceBarrier = sequenceBarrier; this.eventHandler = eventHandler; if (eventHandler instanceof SequenceReportingEventHandler) { ((SequenceReportingEventHandler<?>)eventHandler).setSequenceCallback(sequence); } timeoutHandler = (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null; } ... //It is ok to have another thread rerun this method after a halt(). //通过对sequence进行修改来实现消费RingBuffer里的数据 @Override public void run() { if (running.compareAndSet(IDLE, RUNNING)) { sequenceBarrier.clearAlert(); notifyStart(); try { if (running.get() == RUNNING) { processEvents(); } } finally { notifyShutdown(); running.set(IDLE); } } else { //This is a little bit of guess work. //The running state could of changed to HALTED by this point. //However, Java does not have compareAndExchange which is the only way to get it exactly correct. if (running.get() == RUNNING) { throw new IllegalStateException("Thread is already running"); } else { earlyExit(); } } } private void processEvents() { T event = null; long nextSequence = sequence.get() + 1L; while (true) { try { //通过sequenceBarrier.waitFor()方法看看消费者是否需要等待生产者投递消息 final long availableSequence = sequenceBarrier.waitFor(nextSequence); if (batchStartAware != null) { batchStartAware.onBatchStart(availableSequence - nextSequence + 1); } while (nextSequence <= availableSequence) { //从RingBuffer中获取要消费的数据 event = dataProvider.get(nextSequence); //执行消费者实现的onEvent()方法来消费数据 eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } //设置消费者当前的消费进度 sequence.set(availableSequence); } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) { if (running.get() != RUNNING) { break; } } catch (final Throwable ex) { handleEventException(ex, nextSequence, event); sequence.set(nextSequence); nextSequence++; } } } private void earlyExit() { notifyStart(); notifyShutdown(); } private void notifyTimeout(final long availableSequence) { try { if (timeoutHandler != null) { timeoutHandler.onTimeout(availableSequence); } } catch (Throwable e) { handleEventException(e, availableSequence, null); } } //Notifies the EventHandler when this processor is starting up private void notifyStart() { if (eventHandler instanceof LifecycleAware) { try { ((LifecycleAware) eventHandler).onStart(); } catch (final Throwable ex) { handleOnStartException(ex); } } } //Notifies the EventHandler immediately prior to this processor shutting down private void notifyShutdown() { if (eventHandler instanceof LifecycleAware) { try { ((LifecycleAware) eventHandler).onShutdown(); } catch (final Throwable ex) { handleOnShutdownException(ex); } } } ... }
public class Disruptor<T> { private final RingBuffer<T> ringBuffer; private final Executor executor; private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>(); private final AtomicBoolean started = new AtomicBoolean(false); private ExceptionHandler<? super T> exceptionHandler; ... //设置WorkHandler,创建WorkProcessor //Set up a WorkerPool to distribute an event to one of a pool of work handler threads. //Each event will only be processed by one of the work handlers. //The Disruptor will automatically start this processors when #start() is called. //@param workHandlers the work handlers that will process events. //@return a {@link EventHandlerGroup} that can be used to chain dependencies. @SafeVarargs @SuppressWarnings("varargs") public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers) { return createWorkerPool(new Sequence[0], workHandlers); } //创建WorkerPool,添加到consumerRepository中 EventHandlerGroup<T> createWorkerPool(final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers) { final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences); //创建WorkerPool对象,以及根据workHandlers创建WorkProcessor final WorkerPool<T> workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers); //添加WorkerPool对象到consumerRepository中 consumerRepository.add(workerPool, sequenceBarrier); final Sequence[] workerSequences = workerPool.getWorkerSequences(); //将每个消费者线程持有的Sequence对象添加到生产者Sequencer的gatingSequences属性中(Sequence[]属性) updateGatingSequencesForNextInChain(barrierSequences, workerSequences); return new EventHandlerGroup<>(this, consumerRepository, workerSequences); } private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) { if (processorSequences.length > 0) { ringBuffer.addGatingSequences(processorSequences); for (final Sequence barrierSequence : barrierSequences) { ringBuffer.removeGatingSequence(barrierSequence); } consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); } } ... //Starts the event processors and returns the fully configured ring buffer. //The ring buffer is set up to prevent overwriting any entry that is yet to be processed by the slowest event processor. //This method must only be called once after all event processors have been added. //@return the configured ring buffer. public RingBuffer<T> start() { checkOnlyStartedOnce(); for (final ConsumerInfo consumerInfo : consumerRepository) { //在执行Disruptor.handleEventsWithWorkerPool()方法,调用Disruptor.createWorkerPool()方法时, //会将新创建的WorkerPool对象封装成WorkerPoolInfo对象(即ConsumerInfo对象), //然后通过add()方法添加到consumerRepository中 //所以下面会调用WorkerPoolInfo.start()方法 consumerInfo.start(executor); } return ringBuffer; } private void checkOnlyStartedOnce() { //线程的开关使用CAS实现 if (!started.compareAndSet(false, true)) { throw new IllegalStateException("Disruptor.start() must only be called once."); } } ... } //Provides a repository mechanism to associate EventHandlers with EventProcessors class ConsumerRepository<T> implements Iterable<ConsumerInfo> { private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler = new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>(); private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence = new IdentityHashMap<Sequence, ConsumerInfo>(); private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>(); //添加WorkerPool对象到consumerRepository中 public void add(final WorkerPool<T> workerPool, final SequenceBarrier sequenceBarrier) { final WorkerPoolInfo<T> workerPoolInfo = new WorkerPoolInfo<>(workerPool, sequenceBarrier); consumerInfos.add(workerPoolInfo); for (Sequence sequence : workerPool.getWorkerSequences()) { eventProcessorInfoBySequence.put(sequence, workerPoolInfo); } } ... } class WorkerPoolInfo<T> implements ConsumerInfo { private final WorkerPool<T> workerPool; private final SequenceBarrier sequenceBarrier; private boolean endOfChain = true; WorkerPoolInfo(final WorkerPool<T> workerPool, final SequenceBarrier sequenceBarrier) { this.workerPool = workerPool; this.sequenceBarrier = sequenceBarrier; } @Override public void start(Executor executor) { workerPool.start(executor); } ... } public final class WorkerPool<T> { private final AtomicBoolean started = new AtomicBoolean(false); private final Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); private final RingBuffer<T> ringBuffer; //WorkProcessors are created to wrap each of the provided WorkHandlers private final WorkProcessor<?>[] workProcessors; //Create a worker pool to enable an array of WorkHandlers to consume published sequences. //This option requires a pre-configured RingBuffer which must have RingBuffer#addGatingSequences(Sequence...) called before the work pool is started. //@param ringBuffer of events to be consumed. //@param sequenceBarrier on which the workers will depend. //@param exceptionHandler to callback when an error occurs which is not handled by the {@link WorkHandler}s. //@param workHandlers to distribute the work load across. @SafeVarargs public WorkerPool(final RingBuffer<T> ringBuffer, final SequenceBarrier sequenceBarrier, final ExceptionHandler<? super T> exceptionHandler, final WorkHandler<? super T>... workHandlers) { this.ringBuffer = ringBuffer; final int numWorkers = workHandlers.length; //根据workHandlers创建WorkProcessor workProcessors = new WorkProcessor[numWorkers]; for (int i = 0; i < numWorkers; i++) { workProcessors[i] = new WorkProcessor<>(ringBuffer, sequenceBarrier, workHandlers[i], exceptionHandler, workSequence); } } //Start the worker pool processing events in sequence. //@param executor providing threads for running the workers. //@return the {@link RingBuffer} used for the work queue. //@throws IllegalStateException if the pool has already been started and not halted yet public RingBuffer<T> start(final Executor executor) { if (!started.compareAndSet(false, true)) { throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted."); } final long cursor = ringBuffer.getCursor(); workSequence.set(cursor); for (WorkProcessor<?> processor : workProcessors) { processor.getSequence().set(cursor); //通过传入的线程池,执行WorkProcessor对象的run()方法 executor.execute(processor); } return ringBuffer; } ... } public final class WorkProcessor<T> implements EventProcessor { private final AtomicBoolean running = new AtomicBoolean(false); private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); private final RingBuffer<T> ringBuffer; private final SequenceBarrier sequenceBarrier; private final WorkHandler<? super T> workHandler; private final ExceptionHandler<? super T> exceptionHandler; private final Sequence workSequence; private final EventReleaser eventReleaser = new EventReleaser() { @Override public void release() { sequence.set(Long.MAX_VALUE); } }; private final TimeoutHandler timeoutHandler; //Construct a {@link WorkProcessor}. //@param ringBuffer to which events are published. //@param sequenceBarrier on which it is waiting. //@param workHandler is the delegate to which events are dispatched. //@param exceptionHandler to be called back when an error occurs //@param workSequence from which to claim the next event to be worked on. It should always be initialised as Sequencer#INITIAL_CURSOR_VALUE public WorkProcessor(final RingBuffer<T> ringBuffer, final SequenceBarrier sequenceBarrier, final WorkHandler<? super T> workHandler, final ExceptionHandler<? super T> exceptionHandler, final Sequence workSequence) { this.ringBuffer = ringBuffer; this.sequenceBarrier = sequenceBarrier; this.workHandler = workHandler; this.exceptionHandler = exceptionHandler; this.workSequence = workSequence; if (this.workHandler instanceof EventReleaseAware) { ((EventReleaseAware) this.workHandler).setEventReleaser(eventReleaser); } timeoutHandler = (workHandler instanceof TimeoutHandler) ? (TimeoutHandler) workHandler : null; } //通过对sequence进行修改来实现消费RingBuffer里的数据 @Override public void run() { if (!running.compareAndSet(false, true)) { throw new IllegalStateException("Thread is already running"); } sequenceBarrier.clearAlert(); notifyStart(); boolean processedSequence = true; long cachedAvailableSequence = Long.MIN_VALUE; long nextSequence = sequence.get(); T event = null; while (true) { try { if (processedSequence) { processedSequence = false; do { nextSequence = workSequence.get() + 1L; //设置消费者当前的消费进度 sequence.set(nextSequence - 1L); } while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence)); } if (cachedAvailableSequence >= nextSequence) { //从RingBuffer中获取要消费的数据 event = ringBuffer.get(nextSequence); //执行消费者实现的onEvent()方法来消费数据 workHandler.onEvent(event); processedSequence = true; } else { //通过sequenceBarrier.waitFor()方法看看消费者是否需要等待生产者投递消息 cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence); } } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) { if (!running.get()) { break; } } catch (final Throwable ex) { //handle, mark as processed, unless the exception handler threw an exception exceptionHandler.handleEventException(ex, nextSequence, event); processedSequence = true; } } notifyShutdown(); running.set(false); } ... }
public class Disruptor<T> { private final RingBuffer<T> ringBuffer; private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) { if (processorSequences.length > 0) { ringBuffer.addGatingSequences(processorSequences); for (final Sequence barrierSequence : barrierSequences) { ringBuffer.removeGatingSequence(barrierSequence); } consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); } } ... } abstract class RingBufferPad { protected long p1, p2, p3, p4, p5, p6, p7; } abstract class RingBufferFields<E> extends RingBufferPad { ... private static final Unsafe UNSAFE = Util.getUnsafe(); private final long indexMask; //环形数组存储事件消息 private final Object[] entries; protected final int bufferSize; //RingBuffer的sequencer属性代表了当前线程对应的生产者 protected final Sequencer sequencer; RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; //初始化数组 this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; //内存预加载 fill(eventFactory); } private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } } protected final E elementAt(long sequence) { return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); } ... } public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> { ... //Add the specified gating sequences to this instance of the Disruptor. //They will safely and atomically added to the list of gating sequences. //@param gatingSequences The sequences to add. public void addGatingSequences(Sequence... gatingSequences) { sequencer.addGatingSequences(gatingSequences); } ... } public interface Sequencer extends Cursored, Sequenced { ... //Add the specified gating sequences to this instance of the Disruptor. //They will safely and atomically added to the list of gating sequences. //@param gatingSequences The sequences to add. void addGatingSequences(Sequence... gatingSequences); ... } public abstract class AbstractSequencer implements Sequencer { private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences"); ... @Override public final void addGatingSequences(Sequence... gatingSequences) { SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences); } ... } class SequenceGroups { static <T> void addSequences(final T holder, final AtomicReferenceFieldUpdater<T, Sequence[]> updater, final Cursored cursor, final Sequence... sequencesToAdd) { long cursorSequence; Sequence[] updatedSequences; Sequence[] currentSequences; do { currentSequences = updater.get(holder); updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length); cursorSequence = cursor.getCursor(); int index = currentSequences.length; for (Sequence sequence : sequencesToAdd) { sequence.set(cursorSequence); updatedSequences[index++] = sequence; } } while (!updater.compareAndSet(holder, currentSequences, updatedSequences)); cursorSequence = cursor.getCursor(); for (Sequence sequence : sequencesToAdd) { sequence.set(cursorSequence); } } ... }
3.Disruptor的WaitStrategy等待策略分析
在生产者发布消息时,会调用WaitStrategy的signalAllWhenBlocking()方法唤醒阻塞的消费者。在消费者消费消息时,会调用WaitStrategy的waitFor()方法阻塞消费过快的消费者。
当然,不同的策略不一定就是阻塞消费者,比如BlockingWaitStrategy会通过ReentrantLock来阻塞消费者,而YieldingWaitStrategy则通过yield切换线程来实现让消费者无锁等待,即通过Thread的yield()方法切换线程让另一个线程继续执行自旋判断操作。
所以YieldingWaitStrategy等待策略的效率是最高的 + 最耗费CPU资源,当然效率次高、比较耗费CPU资源的是BusySpinWaitStrategy等待策略。
Disruptor提供了如下几种等待策略:
一.完全阻塞的等待策略BlockingWaitStrategy 二.切换线程自旋的等待策略YieldingWaitStrategy 三.繁忙自旋的等待策略BusySpinWaitStrategy 四.轻微阻塞的等待策略LiteBlockingWaitStrategy 也就是唤醒阻塞线程时,通过GAS避免并发获取锁的等待策略 五.最小睡眠 + 切换线程的等待策略SleepingWaitStrategy
总结:
为了达到最高效率,有大量CPU资源,可切换线程让多个线程自旋判断 为了保证高效的同时兼顾CPU资源,可以让单个线程自旋判断 为了保证比较高效更加兼顾CPU资源,可以切换线程自旋 + 最少睡眠 为了完全兼顾CPU资源不考虑效率问题,可以采用重入锁实现阻塞唤醒 为了完全兼顾CPU资源但考虑一点效率,可以采用重入锁 + GAS唤醒
//完全阻塞的等待策略 //Blocking strategy that uses a lock and condition variable for EventProcessors waiting on a barrier. //This strategy can be used when throughput and low-latency are not as important as CPU resource. public final class BlockingWaitStrategy implements WaitStrategy { private final Lock lock = new ReentrantLock(); private final Condition processorNotifyCondition = lock.newCondition(); @Override public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; if ((availableSequence = cursorSequence.get()) < sequence) { lock.lock(); try { while ((availableSequence = cursorSequence.get()) < sequence) { barrier.checkAlert(); processorNotifyCondition.await(); } } finally { lock.unlock(); } } while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); } return availableSequence; } @Override public void signalAllWhenBlocking() { lock.lock(); try { processorNotifyCondition.signalAll(); } finally { lock.unlock(); } } } //切换线程自旋的等待策略 //Yielding strategy that uses a Thread.yield() for EventProcessors waiting on a barrier after an initially spinning. //This strategy is a good compromise between performance and CPU resource without incurring significant latency spikes. public final class YieldingWaitStrategy implements WaitStrategy { private static final int SPIN_TRIES = 100; @Override public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; int counter = SPIN_TRIES; while ((availableSequence = dependentSequence.get()) < sequence) { counter = applyWaitMethod(barrier, counter); } return availableSequence; } @Override public void signalAllWhenBlocking() { } private int applyWaitMethod(final SequenceBarrier barrier, int counter) throws AlertException { barrier.checkAlert(); if (0 == counter) { //切换线程,让另一个线程继续执行自旋操作 Thread.yield(); } else { --counter; } return counter; } } //繁忙自旋的等待策略 //Busy Spin strategy that uses a busy spin loop for EventProcessors waiting on a barrier. //This strategy will use CPU resource to avoid syscalls which can introduce latency jitter. //It is best used when threads can be bound to specific CPU cores. public final class BusySpinWaitStrategy implements WaitStrategy { @Override public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); } return availableSequence; } @Override public void signalAllWhenBlocking() { } } //轻微阻塞的等待策略(唤醒阻塞线程时避免了并发获取锁) //Variation of the BlockingWaitStrategy that attempts to elide conditional wake-ups when the lock is uncontended. //Shows performance improvements on microbenchmarks. //However this wait strategy should be considered experimental as I have not full proved the correctness of the lock elision code. public final class LiteBlockingWaitStrategy implements WaitStrategy { private final Lock lock = new ReentrantLock(); private final Condition processorNotifyCondition = lock.newCondition(); private final AtomicBoolean signalNeeded = new AtomicBoolean(false); @Override public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; if ((availableSequence = cursorSequence.get()) < sequence) { lock.lock(); try { do { signalNeeded.getAndSet(true); if ((availableSequence = cursorSequence.get()) >= sequence) { break; } barrier.checkAlert(); processorNotifyCondition.await(); } while ((availableSequence = cursorSequence.get()) < sequence); } finally { lock.unlock(); } } while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); } return availableSequence; } @Override public void signalAllWhenBlocking() { if (signalNeeded.getAndSet(false)) { lock.lock(); try { processorNotifyCondition.signalAll(); } finally { lock.unlock(); } } } } //最小睡眠 + 切换线程的等待策略SleepingWaitStrategy //Sleeping strategy that initially spins, then uses a Thread.yield(), //and eventually sleep LockSupport.parkNanos(1) for the minimum number of nanos the OS //and JVM will allow while the EventProcessors are waiting on a barrier. //This strategy is a good compromise between performance and CPU resource. //Latency spikes can occur after quiet periods. public final class SleepingWaitStrategy implements WaitStrategy { private static final int DEFAULT_RETRIES = 200; private final int retries; public SleepingWaitStrategy() { this(DEFAULT_RETRIES); } public SleepingWaitStrategy(int retries) { this.retries = retries; } @Override public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; int counter = retries; while ((availableSequence = dependentSequence.get()) < sequence) { counter = applyWaitMethod(barrier, counter); } return availableSequence; } @Override public void signalAllWhenBlocking() { } private int applyWaitMethod(final SequenceBarrier barrier, int counter) throws AlertException { barrier.checkAlert(); if (counter > 100) { --counter; } else if (counter > 0) { --counter; Thread.yield(); } else { LockSupport.parkNanos(1L); } return counter; } }
4.Disruptor的高性能原因
一.使用了环形结构 + 数组 + 内存预加载
二.使用了单线程写的方式并配合内存屏障
三.消除伪共享(填充缓存行)
四.序号栅栏和序号配合使用来消除锁
五.提供了多种不同性能的等待策略
5.Disruptor高性能之数据结构(内存预加载机制)
(1)RingBuffer使用环形数组来存储元素
(2)采用了内存预加载机制
(1)RingBuffer使用环形数组来存储元素
环形数组可以避免数组扩容和缩容带来的性能损耗。
(2)RingBuffer采用了内存预加载机制
初始化RingBuffer时,会将entries数组里的每一个元素都先new出来。比如RingBuffer的大小设置为8,那么初始化RingBuffer时,就会先将entries数组的8个元素分别指向新new出来的空的Event对象。往RingBuffer填充元素时,只是将对应的Event对象进行赋值。所以RingBuffer中的Event对象是一直存活着的,也就是说它能最小程度减少系统GC频率,从而提升性能。
public class Main { public static void main(String[] args) { //参数准备 OrderEventFactory orderEventFactory = new OrderEventFactory(); int ringBufferSize = 4; ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); //参数一:eventFactory,消息(Event)工厂对象 //参数二:ringBufferSize,容器的长度 //参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler //参数四:ProducerType,单生产者还是多生产者 //参数五:waitStrategy,等待策略 //1.实例化Disruptor对象 Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>( orderEventFactory, ringBufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy() ); //2.添加Event处理器,用于处理事件 //也就是构建Disruptor与消费者的一个关联关系 disruptor.handleEventsWith(new OrderEventHandler()); //3.启动disruptor disruptor.start(); //4.获取实际存储数据的容器: RingBuffer RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer(); OrderEventProducer producer = new OrderEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long i = 0; i < 5; i++) { bb.putLong(0, i); //向容器中投递数据 producer.sendData(bb); } disruptor.shutdown(); executor.shutdown(); } } public class Disruptor<T> { private final RingBuffer<T> ringBuffer; private final Executor executor; ... //Create a new Disruptor. //@param eventFactory the factory to create events in the ring buffer. //@param ringBufferSize the size of the ring buffer, must be power of 2. //@param executor an Executor to execute event processors. //@param producerType the claim strategy to use for the ring buffer. //@param waitStrategy the wait strategy to use for the ring buffer. public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor, final ProducerType producerType, final WaitStrategy waitStrategy) { this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor); } //Private constructor helper private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor) { this.ringBuffer = ringBuffer; this.executor = executor; } ... } //Ring based store of reusable entries containing the data representing an event being exchanged between event producer and EventProcessors. //@param <E> implementation storing the data for sharing during exchange or parallel coordination of an event. public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> { //值为-1 public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE; protected long p1, p2, p3, p4, p5, p6, p7; ... //Create a new Ring Buffer with the specified producer type (SINGLE or MULTI) public static <E> RingBuffer<E> create(ProducerType producerType, EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { switch (producerType) { case SINGLE: return createSingleProducer(factory, bufferSize, waitStrategy); case MULTI: return createMultiProducer(factory, bufferSize, waitStrategy); default: throw new IllegalStateException(producerType.toString()); } } //Create a new single producer RingBuffer with the specified wait strategy. public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy); return new RingBuffer<E>(factory, sequencer); } //Construct a RingBuffer with the full option set. //@param eventFactory to newInstance entries for filling the RingBuffer //@param sequencer sequencer to handle the ordering of events moving through the RingBuffer. RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) { super(eventFactory, sequencer); } ... } abstract class RingBufferFields<E> extends RingBufferPad { private final long indexMask; //环形数组存储事件消息 private final Object[] entries; protected final int bufferSize; //RingBuffer的sequencer属性代表了当前线程对应的生产者 protected final Sequencer sequencer; ... RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; //初始化数组 this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; //内存预加载 fill(eventFactory); } private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { //设置一个空的数据对象 entries[BUFFER_PAD + i] = eventFactory.newInstance(); } } ... } abstract class RingBufferPad { protected long p1, p2, p3, p4, p5, p6, p7; }
6.Disruptor高性能之内核(使用单线程写)
Disruptor的RingBuffer之所以可以做到完全无锁是因为单线程写。离开单线程写,没有任何技术可以做到完全无锁。Redis和Netty等高性能技术框架也是利用单线程写来实现的。
具体就是:单生产者时,固然只有一个生产者线程在写。多生产者时,每个生产者线程都只会写各自获取到的Sequence序号对应的环形数组的元素,从而使得多个生产者线程相互之间不会产生写冲突。
7.Disruptor高性能之系统内存优化(内存屏障)
要正确实现无锁,还需要另外一个关键技术——内存屏障。对应到Java语言,就是valotile变量与Happens Before语义。
内存屏障:Linux的smp_wmb()/smp_rmb()。
8.Disruptor高性能之系统缓存优化(消除伪共享)
CPU缓存是以缓存行(Cache Line)为单位进行存储的。缓存行是2的整数幂个连续字节,一般为32-256个字节,最常见的缓存行大小是64个字节。
当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,就会对这个缓存行形成竞争,从而无意中影响彼此性能,这就是伪共享。
消除伪共享:利用了空间换时间的思想。
由于代表着一个序号的Sequence其核心字段value是一个long型变量(占8个字节),所以有可能会出现多个Sequence对象的value变量共享同一个缓存行。因此,需要对Sequence对象的value变量消除伪共享。具体做法就是:对Sequence对象的value变量前后增加7个long型变量。
注意:伪共享与Sequence的静态变量无关,因为静态变量本身就是多个线程共享的,而不是多个线程隔离独立的。
class LhsPadding { protected long p1, p2, p3, p4, p5, p6, p7; } class Value extends LhsPadding { protected volatile long value; } class RhsPadding extends Value { protected long p9, p10, p11, p12, p13, p14, p15; } public class Sequence extends RhsPadding { static final long INITIAL_VALUE = -1L; private static final Unsafe UNSAFE; private static final long VALUE_OFFSET; static { UNSAFE = Util.getUnsafe(); try { VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value")); } catch (final Exception e) { throw new RuntimeException(e); } } //Create a sequence initialised to -1. public Sequence() { this(INITIAL_VALUE); } //Create a sequence with a specified initial value. public Sequence(final long initialValue) { UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue); } //Perform a volatile read of this sequence's value. public long get() { return value; } //Perform an ordered write of this sequence. //The intent is a Store/Store barrier between this write and any previous store. public void set(final long value) { UNSAFE.putOrderedLong(this, VALUE_OFFSET, value); } ... }
9.Disruptor高性能之序号获取优化(自旋 + CAS)
生产者投递Event时会使用"long sequence = ringBuffer.next()"获取序号,而序号栅栏SequenceBarrier和会序号Sequence搭配起来一起使用,用来协调和管理消费者和生产者的工作节奏,避免锁的使用。
各个消费者和生产者都持有自己的序号,这些序号需满足如下条件以避免生产者速度过快,将还没来得及消费的消息覆盖。
一.消费者序号数值必须小于生产者序号数值 二.消费者序号数值必须小于其前置消费者的序号数值 三.生产者序号数值不能大于消费者中最小的序号数值
高性能的序号获取优化:为避免生产者每次执行next()获取序号时,都要查询消费者的最小序号,Disruptor采取了自旋 + LockSupport挂起线程 + 缓存最小序号 + CAS来优化。既避免了锁,也尽量在不耗费CPU的情况下提升了性能。
单生产者的情况下,只有一个线程添加元素,此时没必要使用锁。多生产者的情况下,会有多个线程并发获取Sequence序号添加元素,此时会通过自旋 + CAS避免锁。
public class OrderEventProducer { private RingBuffer<OrderEvent> ringBuffer; public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void sendData(ByteBuffer data) { //1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号 long sequence = ringBuffer.next(); try { //2.根据这个序号, 找到具体的"OrderEvent"元素 //注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象" OrderEvent event = ringBuffer.get(sequence); //3.进行实际的赋值处理 event.setValue(data.getLong(0)); } finally { //4.提交发布操作 ringBuffer.publish(sequence); } } } //Ring based store of reusable entries containing the data representing an event being exchanged between event producer and EventProcessors. //@param <E> implementation storing the data for sharing during exchange or parallel coordination of an event. public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> { //值为-1 public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE; protected long p1, p2, p3, p4, p5, p6, p7; ... //Increment and return the next sequence for the ring buffer. //Calls of this method should ensure that they always publish the sequence afterward. //E.g. // long sequence = ringBuffer.next(); // try { // Event e = ringBuffer.get(sequence); // ... // } finally { // ringBuffer.publish(sequence); // } //@return The next sequence to publish to. @Override public long next() { return sequencer.next(); } //Publish the specified sequence. //This action marks this particular message as being available to be read. //@param sequence the sequence to publish. @Override public void publish(long sequence) { sequencer.publish(sequence); } //Get the event for a given sequence in the RingBuffer. //This call has 2 uses. //Firstly use this call when publishing to a ring buffer. //After calling RingBuffer#next() use this call to get hold of the preallocated event to fill with data before calling RingBuffer#publish(long). //Secondly use this call when consuming data from the ring buffer. //After calling SequenceBarrier#waitFor(long) call this method with any value greater than that //your current consumer sequence and less than or equal to the value returned from the SequenceBarrier#waitFor(long) method. //@param sequence for the event //@return the event for the given sequence @Override public E get(long sequence) { //调用父类RingBufferFields的elementAt()方法 return elementAt(sequence); } ... } abstract class RingBufferPad { protected long p1, p2, p3, p4, p5, p6, p7; } abstract class RingBufferFields<E> extends RingBufferPad { ... private static final Unsafe UNSAFE = Util.getUnsafe(); private final long indexMask; //环形数组存储事件消息 private final Object[] entries; protected final int bufferSize; //RingBuffer的sequencer属性代表了当前线程对应的生产者 protected final Sequencer sequencer; RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; //初始化数组 this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; //内存预加载 fill(eventFactory); } private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } } protected final E elementAt(long sequence) { return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); } ... }
public abstract class AbstractSequencer implements Sequencer { private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences"); //环形数组的大小 protected final int bufferSize; //等待策略 protected final WaitStrategy waitStrategy; //当前生产者的进度 protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); //每一个Sequence都对应着一个消费者(一个EventHandler或者一个WorkHandler) //这些Sequence会通过SEQUENCE_UPDATER在执行Disruptor的handleEventsWith()等方法时, //由RingBuffer的addGatingSequences()方法进行添加 protected volatile Sequence[] gatingSequences = new Sequence[0]; ... //Create with the specified buffer size and wait strategy. //@param bufferSize The total number of entries, must be a positive power of 2. //@param waitStrategy public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) { if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.bufferSize = bufferSize; this.waitStrategy = waitStrategy; } ... } abstract class SingleProducerSequencerPad extends AbstractSequencer { protected long p1, p2, p3, p4, p5, p6, p7; public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); } } abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad { public SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); } //表示生产者的当前序号,值为-1 protected long nextValue = Sequence.INITIAL_VALUE; //表示消费者的最小序号,值为-1 protected long cachedValue = Sequence.INITIAL_VALUE; } public final class SingleProducerSequencer extends SingleProducerSequencerFields { protected long p1, p2, p3, p4, p5, p6, p7; //Construct a Sequencer with the selected wait strategy and buffer size. //@param bufferSize the size of the buffer that this will sequence over. //@param waitStrategy for those waiting on sequences. public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); } ... @Override public long next() { return next(1); } @Override public long next(int n) { //Sequence的初始化值为-1 if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } //nextValue指的是当前Sequence //this.nextValue为SingleProducerSequencerFields的变量 //第一次调用next()方法时,nextValue = -1 //第二次调用next()方法时,nextValue = 0 //第三次调用next()方法时,nextValue = 1 //第四次调用next()方法时,nextValue = 2 //第五次调用next()方法时,nextValue = 3 long nextValue = this.nextValue; //第一次调用next()方法时,nextSequence = -1 + 1 = 0 //第二次调用next()方法时,nextSequence = 0 + 1 = 1 //第三次调用next()方法时,nextSequence = 1 + 1 = 2 //第四次调用next()方法时,nextSequence = 2 + 1 = 3 //第五次调用next()方法时,nextSequence = 3 + 1 = 4 long nextSequence = nextValue + n; //wrapPoint会用来判断生产者序号是否绕过RingBuffer的环 //如果wrapPoint是负数,则表示还没绕过RingBuffer的环 //如果wrapPoint是非负数,则表示已经绕过RingBuffer的环 //假设bufferSize = 3,那么: //第一次调用next()方法时,wrapPoint = 0 - 3 = -3,还没绕过RingBuffer的环 //第二次调用next()方法时,wrapPoint = 1 - 3 = -2,还没绕过RingBuffer的环 //第三次调用next()方法时,wrapPoint = 2 - 3 = -1,还没绕过RingBuffer的环 //第四次调用next()方法时,wrapPoint = 3 - 3 = 0,已经绕过RingBuffer的环 //第五次调用next()方法时,wrapPoint = 4 - 3 = 1,已经绕过RingBuffer的环 long wrapPoint = nextSequence - bufferSize; //cachedGatingSequence是用来将消费者的最小消费序号缓存起来 //这样就不用每次执行next()方法都要去获取消费者的最小消费序号 //第一次调用next()方法时,cachedGatingSequence = -1 //第二次调用next()方法时,cachedGatingSequence = -1 //第三次调用next()方法时,cachedGatingSequence = -1 //第四次调用next()方法时,cachedGatingSequence = -1 //第五次调用next()方法时,cachedGatingSequence = 1 long cachedGatingSequence = this.cachedValue; //第四次调用next()方法时,wrapPoint大于cachedGatingSequence,执行条件中的逻辑 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { //最小的消费者序号 long minSequence; //自旋操作: //Util.getMinimumSequence(gatingSequences, nextValue)的含义就是找到消费者中最小的序号值 //如果wrapPoint > 消费者中最小的序号,那么生产者线程就需要进行阻塞 //即如果生产者序号 > 消费者中最小的序号,那么就挂起并进行自旋操作 //第四次调用next()方法时,nextValue = 2,wrapPoint = 0,gatingSequences里面的消费者序号如果还没消费(即-1),则要挂起 while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { //TODO: Use waitStrategy to spin? LockSupport.parkNanos(1L); } //cachedValue接收了消费者的最小序号 //第四次调用next()方法时,假设消费者的最小序号minSequence为1,则cachedValue = 1 this.cachedValue = minSequence; } //第一次调用完next()方法时,nextValue会变为0 //第二次调用完next()方法时,nextValue会变为1 //第三次调用完next()方法时,nextValue会变为2 //第四次调用完next()方法时,nextValue会变为3 //第五次调用完next()方法时,nextValue会变为4 this.nextValue = nextSequence; //第一次调用next()方法时,返回的nextSequence = 0 //第二次调用next()方法时,返回的nextSequence = 1 //第三次调用next()方法时,返回的nextSequence = 2 //第四次调用next()方法时,返回的nextSequence = 3 //第五次调用next()方法时,返回的nextSequence = 4 return nextSequence; } @Override public void publish(long sequence) { //设置当前生产者的sequence cursor.set(sequence); //通过等待策略通知阻塞的消费者 waitStrategy.signalAllWhenBlocking(); } ... } public final class Util { ... //Get the minimum sequence from an array of {@link com.lmax.disruptor.Sequence}s. //@param sequences to compare. //@param minimum an initial default minimum. If the array is empty this value will be returned. //@return the smaller of minimum sequence value found in sequences and minimum; minimum if sequences is empty public static long getMinimumSequence(final Sequence[] sequences, long minimum) { for (int i = 0, n = sequences.length; i < n; i++) { long value = sequences[i].get(); minimum = Math.min(minimum, value); } return minimum; } ... } public final class MultiProducerSequencer extends AbstractSequencer { ... @Override public long next() { return next(1); } @Override public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long current; long next; do { //获取当前生产者的序号 current = cursor.get(); next = current + n; //wrapPoint会用来判断生产者序号是否绕过RingBuffer的环 //如果wrapPoint是负数,则表示还没绕过RingBuffer的环 //如果wrapPoint是非负数,则表示已经绕过RingBuffer的环 long wrapPoint = next - bufferSize; //cachedGatingSequence是用来将消费者的最小消费序号缓存起来 //这样就不用每次执行next()方法都要去获取消费者的最小消费序号 long cachedGatingSequence = gatingSequenceCache.get(); if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { //gatingSequence表示的是消费者的最小序号 long gatingSequence = Util.getMinimumSequence(gatingSequences, current); if (wrapPoint > gatingSequence) { //TODO, should we spin based on the wait strategy? LockSupport.parkNanos(1); continue; } gatingSequenceCache.set(gatingSequence); } else if (cursor.compareAndSet(current, next)) { break; } } while (true); return next; } ... }
详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等