Disruptor中怎么实现一个高性能队列,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

创新互联是专业的辉县网站建设公司,辉县接单;提供成都网站制作、成都网站设计,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行辉县网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!
import java.util.concurrent.ThreadFactory
import com.lmax.disruptor.dsl.{Disruptor, ProducerType}
import com.lmax.disruptor.{BlockingWaitStrategy,EventFactory,EventHandler,EventTranslatorOneArg,WaitStrategy}
object DisruptorTest {
val disruptor = {
val factory = new EventFactory[Event] {
override def newInstance(): Event = Event(-1)
}
val threadFactory = new ThreadFactory(){
override def newThread(r: Runnable): Thread = new Thread(r)
}
val disruptor = new Disruptor[Event](factory, 4, threadFactory, ProducerType.SINGLE,
new BlockingWaitStrategy())
disruptor.handleEventsWith(TestHandler).`then`(ThenHandler)
disruptor
}
val translator = new EventTranslatorOneArg[Event, Int]() {
override def translateTo(event: Event, sequence: Long, arg: Int): Unit = {
event.id = arg
println(s"translator: ${event}, sequence: ${sequence}, arg: ${arg}")
}
}
def main(args: Array[String]): Unit = {
disruptor.start()
(0 until 100).foreach { i =>
disruptor.publishEvent(translator, i)
}
disruptor.shutdown()
}
}
case class Event(var id: Int) {
override def toString: String = s"event: ${id}"
}
object TestHandler extends EventHandler[Event] {
override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = {
println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}")
}
}
object ThenHandler extends EventHandler[Event] {
override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = {
println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}")
}
}先看 Disruptor 构造方法
public Disruptor(final EventFactoryeventFactory, final int ringBufferSize, final ThreadFactory threadFactory, final ProducerType producerType, final WaitStrategy waitStrategy) { this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), new BasicExecutor(threadFactory)); }
在看 RingBuffer.create, 最终通过 fill 方法 将 eventFactory.newInstance() 作为默认值,塞到 ringBuffer 里面
public staticRingBuffer create(ProducerType producerType, EventFactory factory, int bufferSize, WaitStrategy waitStrategy) { switch (producerType) { case SINGLE: return createSingleProducer(factory, bufferSize, waitStrategy); case MULTI: return createMultiProducer(factory, bufferSize, waitStrategy); default: throw new IllegalStateException(producerType.toString()); } } public static RingBuffer createSingleProducer(EventFactory factory, int bufferSize, WaitStrategy waitStrategy) { SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy); return new RingBuffer (factory, sequencer); } RingBufferFields(EventFactory eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; fill(eventFactory); } private void fill(EventFactory eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } }
首先看 disruptor.start(): 消费事件消息入口
private final ConsumerRepositoryconsumerRepository = new ConsumerRepository<>(); public RingBuffer start() { checkOnlyStartedOnce(); for (final ConsumerInfo consumerInfo : consumerRepository) { consumerInfo.start(executor); } return ringBuffer; }
consumerRepository 类型由 disruptor.handleEventsWith(TestHandler) 初始化, 并构造事件消息处理链
public final EventHandlerGrouphandleEventsWith(final EventHandler super T>... handlers) { return createEventProcessors(new Sequence[0], handlers); } EventHandlerGroup createEventProcessors(final Sequence[] barrierSequences, final EventHandler super T>[] eventHandlers) { checkNotStarted(); final Sequence[] processorSequences = new Sequence[eventHandlers.length]; final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { final EventHandler super T> eventHandler = eventHandlers[i]; final BatchEventProcessor batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler); if (exceptionHandler != null) { batchEventProcessor.setExceptionHandler(exceptionHandler); } consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } updateGatingSequencesForNextInChain(barrierSequences, processorSequences); return new EventHandlerGroup<>(this, consumerRepository, processorSequences); }
回头看 disruptor.start() 中的 consumerInfo.start(executor) executor = new BasicExecutor(threadFactory),BasicExecutor 在每次 execute 任务时,都会 new thread **但是 consumerRepository 的数量是有限的,所以 new thread 也没啥问题
public Disruptor( final EventFactoryeventFactory, final int ringBufferSize, final ThreadFactory threadFactory, final ProducerType producerType, final WaitStrategy waitStrategy) { this( RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), new BasicExecutor(threadFactory)); } private Disruptor(final RingBuffer ringBuffer, final Executor executor) { this.ringBuffer = ringBuffer; this.executor = executor; } @Override public void start(final java.util.concurrent.Executor executor){ //EventProcessor extends Runnable //executor = BasicExecutor executor.execute(eventprocessor); } public final class BatchEventProcessor implements EventProcessor { @Override public void run() { if (running.compareAndSet(IDLE, RUNNING)) { sequenceBarrier.clearAlert(); notifyStart(); try { if (running.get() == RUNNING) { processEvents(); } } finally { notifyShutdown(); running.set(IDLE); } } else { if (running.get() == RUNNING) { throw new IllegalStateException("Thread is already running"); } else { earlyExit(); } } } } private void processEvents() { T event = null; long nextSequence = sequence.get() + 1L; while (true) { try { final long availableSequence = sequenceBarrier.waitFor(nextSequence); if (batchStartAware != null) { batchStartAware.onBatchStart(availableSequence - nextSequence + 1); } while (nextSequence <= availableSequence) { event = dataProvider.get(nextSequence); eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } sequence.set(availableSequence); } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) { if (running.get() != RUNNING) { break; } } catch (final Throwable ex) { exceptionHandler.handleEventException(ex, nextSequence, event); sequence.set(nextSequence); nextSequence++; } } }
executor.execute 也就是 BasicExecutor.execute(eventHandler) 会异步的执行 eventHandler, 也就是调用 BatchEventProcessor.run 方法
问题来了,既然是异步执行,多个 eventHandler 是怎么按照顺序去处理事件消息的?
我们看 processEvents 方法执行逻辑
先获取 BatchEventProcessor.sequence 并 +1
通过 sequenceBarrier.waitFor 也就是 WaitStrategy.waitFor 获取到可用的 availableSequence
先看下 BlockingWaitStrategy.waitFor 的实现
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence,
SequenceBarrier barrier)
throws AlertException, InterruptedException {
long availableSequence;
if (cursorSequence.get() < sequence) {
lock.lock();
try {
while (cursorSequence.get() < sequence) {
barrier.checkAlert();
processorNotifyCondition.await();
}
}
finally {
lock.unlock();
}
}
while ((availableSequence = dependentSequence.get()) < sequence) {
barrier.checkAlert();
}
return availableSequence;
}如果 cursorSequence(ringbuffer 的索引) < sequence(batchEventProcessor 的索引) 则batchEventProcessor挂起等待 否则 就用 dependentSequence作为 availableSequence 返回 然后 batchEventProcessor 会将 availableSequence 索引之前的数据一次性处理完,并更新自身的 sequence 索引值
dependentSequence 由 ProcessingSequenceBarrier 构造方法初始化
final class ProcessingSequenceBarrier implements SequenceBarrier {
private final WaitStrategy waitStrategy;
private final Sequence dependentSequence;
private volatile boolean alerted = false;
private final Sequence cursorSequence;
private final Sequencer sequencer;
ProcessingSequenceBarrier(final Sequencer sequencer, final WaitStrategy waitStrategy,
final Sequence cursorSequence, final Sequence[] dependentSequences) {
this.sequencer = sequencer;
this.waitStrategy = waitStrategy;
this.cursorSequence = cursorSequence;
if (0 == dependentSequences.length) {
dependentSequence = cursorSequence;
} else {
dependentSequence = new FixedSequenceGroup(dependentSequences);
}
}
}在 Disruptor.createEventProcessors 中的, 进行了初始化 ProcessingSequenceBarrier final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences) createEventProcessors 仅会被 Disruptor.handleEventsWith和 EventHandlerGroup.handleEventsWith
public class Disruptor{ public final EventHandlerGroup handleEventsWith(final EventHandler super T>... handlers) { return createEventProcessors(new Sequence[0], handlers); } EventHandlerGroup createEventProcessors(final Sequence[] barrierSequences, final EventHandler super T>[] eventHandlers) { checkNotStarted(); final Sequence[] processorSequences = new Sequence[eventHandlers.length]; final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { final EventHandler super T> eventHandler = eventHandlers[i]; final BatchEventProcessor batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler); if (exceptionHandler != null) { batchEventProcessor.setExceptionHandler(exceptionHandler); } consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } updateGatingSequencesForNextInChain(barrierSequences, processorSequences); return new EventHandlerGroup<>(this, consumerRepository, processorSequences); } } public class EventHandlerGroup { private final Disruptor disruptor; private final ConsumerRepository consumerRepository; private final Sequence[] sequences; EventHandlerGroup(final Disruptor disruptor, final ConsumerRepository consumerRepository, final Sequence[] sequences) { this.disruptor = disruptor; this.consumerRepository = consumerRepository; this.sequences = Arrays.copyOf(sequences, sequences.length); } public final EventHandlerGroup handleEventsWith(final EventHandler super T>... handlers) { return disruptor.createEventProcessors(sequences, handlers); } public final EventHandlerGroup then(final EventHandler super T>... handlers) { return handleEventsWith(handlers); } }
EventHandlerGroup 会拷贝一份 batchEventProcessor 中的 sequence demo 例子中 disruptor.handleEventsWith(TestHandler).then(ThenHandler) 通过 then 方法将 TestHandler 中的 sequence 传递给 ThenHandler 这样 ThenHandler 就依赖了 TestHandler, ThenHandler 就会在 TestHandler 后执行
接着看 disruptor.publishEvent(translator, i) 就是往 ringBuffer 里面放数据,
public void publishEvent(EventTranslatorOneArgtranslator, A arg0) { final long sequence = sequencer.next(); translateAndPublish(translator, sequence, arg0); } private void translateAndPublish(EventTranslatorOneArg translator, long sequence, A arg0) { try { translator.translateTo(get(sequence), sequence, arg0); } finally { sequencer.publish(sequence); } } public E get(long sequence) { return elementAt(sequence); }
get(sequence) 根据 sequence [ringbuffer 索引] 获取 ringbuffer 数组里的对象 translator 将其处理替换完后,ringbuffer 数组的的值将是新的值,publish 将会更新索引的标记位 waitStrategy.signalAllWhenBlocking() 会通知阻塞等待的消费者去继续消费消息
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
@Override
public void publish(long sequence) {
cursor.set(sequence);
waitStrategy.signalAllWhenBlocking();
}流程理清楚了,我们看看 知识点
ringbuffer
内存使用率很高,不会造成内存碎片,几乎没有浪费。业务处理的同一时间,访问的内存数据段集中。 可以更好的适应不同系统,取得较高的性能。内存的物理布局简单单一,不太容易发生内存越界、悬空指针等 bug,出了问题也容易在内存级别分析调试。 做出来的系统容易保持健壮。
cpu cache
CPU 访问内存时会等待,导致计算资源大量闲置,降低 CPU 整体吞吐量。 由于内存数据访问的热点集中性,在 CPU 和内存之间用较为快速而成本较高(相对于内存)的介质做一层缓存,就显得性价比极高了
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注创新互联行业资讯频道,感谢您对创新互联的支持。