Disruptor在互联网核心链路概念与应用原创
金蝶云社区-艾贺521
艾贺521
7人赞赏了该文章 698次浏览 未经作者许可,禁止转载编辑于2019年06月14日 21:59:44

如果在互联网行业,大家都知道公司的业务有一条最主要的链路,各种业务都围绕这条链路在处理。就比如说在淘宝购物的时候,选商品,加购物车,下单,付款。在租车行业,则是下单预定,排车,取车,还车,确认账单结算。

一般核心链路的代码是非常复杂的,业务逻辑非常复杂,有的是串行执行的,有的是并行执行的,根据需求,如果能并行的,就并行,来提高吞吐量,如果可以异步的,就采用异步的方式,用MQ来处理。


服务之间的调用,不仅要求很高的性能,另外要有一定的降级策略,如果是强依赖的服务关系,一定要有兜底,如果是弱依赖,可以进行降级。


核心链路的特点:

  • 最重要的业务,逻辑复杂。核心链路有各种各样的服务,对于租车,比如:短租,分时租,月租,年租,国际租。同样的链路不同的服务。对于不同服务,实现方式:

    • 完全解耦的方式:代码之间不重复,每个服务,业务线都有自己的代码,业务线之间代码不重复。

    • 模板模式:因为每个业务线之间有重复的地方,把共同的地方单独抽取出来,然后其他业务在模板上进行开发。这种方式耦合性会比较强。



Disruptor非常灵活,可以根据我们的链路图进行编码,不管是有时候一些并行的操作,还是需要串行的,或者更加复杂的链路图形。

我们知道核心链路非常复杂,而Disruptor可以灵活应对即可,接下来我们看下Disruptor对于一些复杂流程的编程实现。


案例

如果对Disruptor不了解的同学,先看这篇对Disruptor的介绍,再来看这篇高阶的部分。Disruptor入门


在开始之前,先看下Disruptor的处理器方法:

image.png


控制顺序的EventHandlerGroup

image.png

串行链路

串行是最简单的方式,如果是串行操作,常规的方法就可以,但是Disruptor还是给我们提供了串行的写法。

image.png

  1. 我们先写几个Handler来处理生产者的数据。事件仍然是上面给的入门例子中的LongEvent。

public class LongEventHandler implements EventHandler<LongEvent> {
  @Override
  public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
//       System.out.println("Event: " + longEvent.getValue());
      System.out.println("Handler 1进行处理: XXXX" + longEvent.getValue());
  }
}

public class LongEventHandler2 implements EventHandler<LongEvent> {
  @Override
  public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
      System.out.println("Handler 2进行处理: YYYY" + longEvent.getValue());

  }
}

public class LongEventHandler3 implements EventHandler<LongEvent> {
  @Override
  public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
      System.out.println("Handler 3进行处理: ZZZZ" + longEvent.getValue());

  }
}


2 只需要在Disruptor启动的时候,指定处理器的方式如下,记得使用的是then,then在英语中代表的就是然后的意思。

image.png


3 启动后可以看到如下结果,可以看到确实是按照串行的方式来进行处理的

image.png


记得用:

disruptor.handleEventsWith(new LongEventHandler())
  .then(new LongEventHandler2())
  .then(new LongEventHandler3())
;


并行链路

有的时候,想让程序并行来执行:

image.png

  1. 如果想要使用并行链路来进行处理,在上面的设置消费者代码中并行设置如下:

// 设置消费者
disruptor.handleEventsWith(new LongEventHandler());
disruptor.handleEventsWith(new LongEventHandler2());
disruptor.handleEventsWith(new LongEventHandler3());


2 因为我们刚才已经设置了3个Handler,直接用来测试程序即可。

3 测试结果如下

image.png


可以看到程序没有安装消费者的顺序进行执行,消费者之间是并行处理的。除了刚开始的写法,直接都传入参数的写法也可以


额外写法

如下写法,也是可以让消费者进行并行的处理。

disruptor.handleEventsWith(new LongEventHandler(),new LongEventHandler2(),new LongEventHandler3());



多边形操作

当链路既有并行,又有串行的方式的时候,如下:

image.png


因为上面我们已经给出了代码,那么这次只需要变换下写法就行了。

      disruptor.handleEventsWith(new LongEventHandler(),new LongEventHandler2())
          .then(new LongEventHandler3())
      ;



可以看到,handler1与handler2是并行的,并且只有在Handler1与2执行完毕之后才会执行handler3.

image.png


多生产者多消费者

业务链路在更复杂的时候,可能涉及到多个生产者多个消费者,但是不用担心,Disruptor给我们提供了对应的功能。

  1. 在创建Disruptor的时候指定生产类型为ProducerType.Multi

image.png


  1. 多生产者多消费者依赖workerPool实现多消费者。

下面是开发的时候大体框架


   publicstaticvoidmain(String[] args) throwsInterruptedException{
       // 1. 创建RingBuffer
       RingBuffer<LongEvent>ringBuffer=RingBuffer.create(
               ProducerType.MULTI,
               newLongEventFactory(),
               1024*1024,
               newYieldingWaitStrategy()
      );

       // 2. 通过RingBuffer创建一个屏障
       SequenceBarriersequenceBarrier=ringBuffer.newBarrier();

       // 3. 多消费者多生产者模式,构建多消费者,消费者必须要实现
       // WorkHandler
       // 创建多个消费者数组
       LongEventHandler[] consumers=newLongEventHandler[10];
       for(inti=0; i<10; i++) {
           consumers[i] =newLongEventHandler("C"+i);
      }

       // 4. 构建多消费者的工作池
       WorkerPool<LongEvent>workerPool=newWorkerPool<>(
               ringBuffer,
               sequenceBarrier,
               newIgnoreExceptionHandler(),
               consumers
      );


       // 5. 设置多个消费者的sequence序号, 用于单独统计消费
       // 设置到RingBuffer中
       ringBuffer.addGatingSequences(workerPool.getWorkerSequences());

       // 6. 启动workerPool
       ExecutorServicethreadPool=Executors.newFixedThreadPool(4);
       workerPool.start(threadPool);

       // 7. 设置100个生产者
       finalCountDownLatchlatch=newCountDownLatch(1);
       for(inti=0; i<100; i++) {
           LongEventProducerlongEventProducer=newLongEventProducer(ringBuffer);
           newThread(){
               @Override
               publicvoidrun() {
                   try{
                       latch.await();
                  } catch(InterruptedExceptione) {

                  }

                   for(intj=0; j<100; j++) {
                       longEventProducer.onData(Long.valueOf(j));
                  }
                   
              }
          }.start();
      }

       Thread.sleep(2000);
       System.out.println("----开始生产数据-----");
       latch.countDown();
       Thread.sleep(5000);

       for(inti=0; i<10; i++) {
           System.out.println("第"+i+"个消费者消费数量:"+consumers[i].getCount());
      }

       threadPool.shutdown();
       workerPool.halt();

  }



计算机总共就4个核,所以只有4个消费者可以消费

image.png




最后

在LMAX交易平台上,其号称借助Disruptor每秒可以处理600万笔订单数据,总之性能非常强大。这里我们虽然没有测试其性能,但是通过案例可以看到其灵活性。

通过引入Disruptor,可以让核心链路更加健壮和可扩展。在真实的场景中核心的链路远比今天所举的例子复杂,但是相信你已经掌握了Disruptor的灵活使用,肯定可以应对的。



参考


注:

本文独家发布自金蝶云社区


图标赞 7
7人点赞
还没有人点赞,快来当第一个点赞的人吧!
图标打赏
0人打赏
还没有人打赏,快来当第一个打赏的人吧!