Disruptor使用原创
金蝶云社区-艾贺521
艾贺521
73人赞赏了该文章 690次浏览 未经作者许可,禁止转载编辑于2019年06月14日 21:56:31


Disruptor是LMAX公司开源的一款高性能的多线程通信库。Java的队列在高并发场景下会带来延迟,而LMAX目标是成为世界上最快的交易平台,也就是系统要有非常低的延迟和很好的吞吐量。为了优化Java队列的延迟问题,LMAX研发了Disruptor。

Disruptor不是仅仅为金融领域专用的,在解决并发编程的难题上,它都是可以适用的。

在普通的并发编程中,CPU级别的缓存未命中,还有内核对锁的操作都是有很大的开销,Disruptor则是无锁的。


实战

在Disruptor入门中,我们会考虑用一个简单的例子来理解它,从producer中传递一个Long类型的值,然后在consumer中将数字输出。

引入依赖:

      <dependency>
           <groupId>com.lmax</groupId>
           <artifactId>disruptor</artifactId>
           <version>3.2.0</version>
       </dependency>


  1. 首先,定义要携带数据的事件对象。

public class LongEvent
{
  private long value;

  public void set(long value)
  {
      this.value = value;
  }
}


  1. 为了Disrutpor可以预先分配这些对象,还需要创建一个事件工厂。

import com.lmax.disruptor.EventFactory;

public class LongEventFactory implements EventFactory<LongEvent>
{
  public LongEvent newInstance()
  {
      return new LongEvent();
  }
}


  1. 当我们的事件工厂创建完毕以后,我们需要创建consume来处理这些事件,本次我们只需要在控制台中输出事件携带的数据就好。

import com.lmax.disruptor.EventHandler;

public class LongEventHandler implements EventHandler<LongEvent>
{
  public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
  {
      System.out.println("Event: " + event);
  }
}


  1. 发布事件,在Disruptor 3.0之后,更喜欢使用Event Publisher/Event Translator来发布事件。使用这种方式的好处是,translator代码可以独立出来,并且更加容易进行测试,Disruptor也提供了很多内置的(EventTranslator, EventTranslatorOneArg, EventTranslatorTwoArg)等,

publicclassLongEventProducerWithTranslator{
   privatefinalRingBuffer<LongEvent>ringBuffer;

   publicLongEventProducerWithTranslator(RingBuffer<LongEvent>ringBuffer) {
       this.ringBuffer=ringBuffer;
  }

   privatestaticfinalEventTranslatorOneArg<LongEvent, ByteBuffer>TRANSLATOR=
           newEventTranslatorOneArg<LongEvent, ByteBuffer>() {
               @Override
               publicvoidtranslateTo(LongEventevent, longsequence, ByteBufferbb) {
                   event.set(bb.getLong(0));
              }
          };

   publicvoidonData(ByteBufferbb) {
       ringBuffer.publishEvent(TRANSLATOR, bb);
  }
}


除了使用Translator,还有一种更加常见的方法,但是这种方法在多发布者的场景中,可能会出现一些无法预期的问题,因此还是建议使用Translator。

// 不建议使用
@Deprecated
public class LongEventProducer {
  private final RingBuffer<LongEvent> ringBuffer;

  public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
      this.ringBuffer = ringBuffer;
  }

  public void onData(ByteBuffer bb) {
      long sequence = ringBuffer.next(); // Grab the next sequence
      try {
          LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
          // for the sequence
          event.set(bb.getLong(0)); // Fill with data
      } finally {
          ringBuffer.publish(sequence);
      }
  }
}


  1. 将整个过程串联起来。


    image.png

publicclassLongEventMain{
   publicstaticvoidmain(String[] args) throwsInterruptedException{
       // 构建Disrutor对象
       Disruptor<LongEvent>disruptor=newDisruptor<LongEvent>(newLongEventFactory(),
               1024*1024,
               Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()),
               ProducerType.SINGLE,
               newBusySpinWaitStrategy()
      );

       // 设置消费者
       disruptor.handleEventsWith(newLongEventHandler());

       // 启动Disrutor
       disruptor.start();


       RingBuffer<LongEvent>ringBuffer=disruptor.getRingBuffer();
       LongEventProducerWithTranslatorproducer=newLongEventProducerWithTranslator(ringBuffer);

       ByteBufferbb=ByteBuffer.allocate(8);
       for(longl=0; true; l++)
      {
           bb.putLong(0, l);
           producer.onData(bb);
           Thread.sleep(1000);
      }

  }
}


  1. 查看运行结果

image.png


用法就是上面那些了,还有一些Disrutor的具体使用,可以结合实际的业务场景。


扩展

在创建Disrutor对象的时候,需要指定RingBuffer的等待策略,默认的测试是BlockingWaitStrategy,阻塞等待。也就是使用锁,然后等待其它的线程进行唤醒。除了阻塞等待之外,还有一些系统提供的策略,当然也可以自定义等待策略

  • SleepingWaitStrategy 在不需要低延迟的场景下,可以使用它,内部用的是LockSupport.parkNanos。

  • YieldingWaitStrategy  内部使用的是Threa.yield,允许其他的线程先进行,比较推荐的使用方式

  • BusySpinWaitStrategy 一般在实际的消费者数量小于核心线程数的时候使用,因为它会不断的循环,占用CPU资源


其它用法

我看到内部的系统再使用Disrutor的时候,在消费事件的时候,将事件类型进行转换,再次异步处理。

publicclassAlarmDisruptorConsumerimplementsWorkHandler<AlarmDisruptorVO>{

   @Override
   publicvoidonEvent(AlarmDisruptorVOevent) throwsException{

       if(event!=null) {
           process(event.getAlarmMsgVO());
           process(event.getAlarmTaskVO());
      }
  }


   publicvoidprocess(AlarmMsgVOmsgVO) {

       if(msgVO==null) {
           return;
      }
       AlarmMsgHandlerServicealarmMsgHandlerService=getAlarmMsgHandlerService();
       // 将消息转换为任务
       alarmMsgHandlerService.alarmMsg2Task(msgVO);
  }

   publicvoidprocess(AlarmTaskVOmsgVO) {
       if(msgVO==null) {
           return;
      }
       AlarmMsgHandlerServicealarmMsgHandlerService=getAlarmMsgHandlerService();
       // 处理任务
       alarmMsgHandlerService.alarmTask2Process( msgVO);

  }

   privateAlarmMsgHandlerServicegetAlarmMsgHandlerService(){
       return(AlarmMsgHandlerService) SpringApplicationContext.getBean("alarmMsgHandlerService");
  }
}



最后

关于Disrutor的使用就说到这了,如果JDK内部的并发编程框架不能满足你对性能上的要求,那么这是一个值得尝试的方案。


参考


赞 73