如果在互联网行业,大家都知道公司的业务有一条最主要的链路,各种业务都围绕这条链路在处理。就比如说在淘宝购物的时候,选商品,加购物车,下单,付款。在租车行业,则是下单预定,排车,取车,还车,确认账单结算。
一般核心链路的代码是非常复杂的,业务逻辑非常复杂,有的是串行执行的,有的是并行执行的,根据需求,如果能并行的,就并行,来提高吞吐量,如果可以异步的,就采用异步的方式,用MQ来处理。
服务之间的调用,不仅要求很高的性能,另外要有一定的降级策略,如果是强依赖的服务关系,一定要有兜底,如果是弱依赖,可以进行降级。
核心链路的特点:
最重要的业务,逻辑复杂。核心链路有各种各样的服务,对于租车,比如:短租,分时租,月租,年租,国际租。同样的链路不同的服务。对于不同服务,实现方式:
完全解耦的方式:代码之间不重复,每个服务,业务线都有自己的代码,业务线之间代码不重复。
模板模式:因为每个业务线之间有重复的地方,把共同的地方单独抽取出来,然后其他业务在模板上进行开发。这种方式耦合性会比较强。
Disruptor非常灵活,可以根据我们的链路图进行编码,不管是有时候一些并行的操作,还是需要串行的,或者更加复杂的链路图形。
我们知道核心链路非常复杂,而Disruptor可以灵活应对即可,接下来我们看下Disruptor对于一些复杂流程的编程实现。
案例
如果对Disruptor不了解的同学,先看这篇对Disruptor的介绍,再来看这篇高阶的部分。Disruptor入门
在开始之前,先看下Disruptor的处理器方法:
控制顺序的EventHandlerGroup
串行链路
串行是最简单的方式,如果是串行操作,常规的方法就可以,但是Disruptor还是给我们提供了串行的写法。
我们先写几个Handler来处理生产者的数据。事件仍然是上面给的入门例子中的LongEvent。
public class LongEventHandler implements EventHandler<LongEvent> { @Override public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception { // System.out.println("Event: " + longEvent.getValue()); System.out.println("Handler 1进行处理: XXXX" + longEvent.getValue()); } } public class LongEventHandler2 implements EventHandler<LongEvent> { @Override public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception { System.out.println("Handler 2进行处理: YYYY" + longEvent.getValue()); } } public class LongEventHandler3 implements EventHandler<LongEvent> { @Override public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception { System.out.println("Handler 3进行处理: ZZZZ" + longEvent.getValue()); } }
2 只需要在Disruptor启动的时候,指定处理器的方式如下,记得使用的是then,then在英语中代表的就是然后的意思。
3 启动后可以看到如下结果,可以看到确实是按照串行的方式来进行处理的
记得用:
disruptor.handleEventsWith(new LongEventHandler()) .then(new LongEventHandler2()) .then(new LongEventHandler3()) ;
并行链路
有的时候,想让程序并行来执行:
如果想要使用并行链路来进行处理,在上面的设置消费者代码中并行设置如下:
// 设置消费者 disruptor.handleEventsWith(new LongEventHandler()); disruptor.handleEventsWith(new LongEventHandler2()); disruptor.handleEventsWith(new LongEventHandler3());
2 因为我们刚才已经设置了3个Handler,直接用来测试程序即可。
3 测试结果如下
可以看到程序没有安装消费者的顺序进行执行,消费者之间是并行处理的。除了刚开始的写法,直接都传入参数的写法也可以
额外写法
如下写法,也是可以让消费者进行并行的处理。
disruptor.handleEventsWith(new LongEventHandler(),new LongEventHandler2(),new LongEventHandler3());
多边形操作
当链路既有并行,又有串行的方式的时候,如下:
因为上面我们已经给出了代码,那么这次只需要变换下写法就行了。
disruptor.handleEventsWith(new LongEventHandler(),new LongEventHandler2()) .then(new LongEventHandler3()) ;
可以看到,handler1与handler2是并行的,并且只有在Handler1与2执行完毕之后才会执行handler3.
多生产者多消费者
业务链路在更复杂的时候,可能涉及到多个生产者多个消费者,但是不用担心,Disruptor给我们提供了对应的功能。
在创建Disruptor的时候指定生产类型为ProducerType.Multi
多生产者多消费者依赖workerPool实现多消费者。
下面是开发的时候大体框架
publicstaticvoidmain(String[] args) throwsInterruptedException{ // 1. 创建RingBuffer RingBuffer<LongEvent>ringBuffer=RingBuffer.create( ProducerType.MULTI, newLongEventFactory(), 1024*1024, newYieldingWaitStrategy() ); // 2. 通过RingBuffer创建一个屏障 SequenceBarriersequenceBarrier=ringBuffer.newBarrier(); // 3. 多消费者多生产者模式,构建多消费者,消费者必须要实现 // WorkHandler // 创建多个消费者数组 LongEventHandler[] consumers=newLongEventHandler[10]; for(inti=0; i<10; i++) { consumers[i] =newLongEventHandler("C"+i); } // 4. 构建多消费者的工作池 WorkerPool<LongEvent>workerPool=newWorkerPool<>( ringBuffer, sequenceBarrier, newIgnoreExceptionHandler(), consumers ); // 5. 设置多个消费者的sequence序号, 用于单独统计消费 // 设置到RingBuffer中 ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); // 6. 启动workerPool ExecutorServicethreadPool=Executors.newFixedThreadPool(4); workerPool.start(threadPool); // 7. 设置100个生产者 finalCountDownLatchlatch=newCountDownLatch(1); for(inti=0; i<100; i++) { LongEventProducerlongEventProducer=newLongEventProducer(ringBuffer); newThread(){ @Override publicvoidrun() { try{ latch.await(); } catch(InterruptedExceptione) { } for(intj=0; j<100; j++) { longEventProducer.onData(Long.valueOf(j)); } } }.start(); } Thread.sleep(2000); System.out.println("----开始生产数据-----"); latch.countDown(); Thread.sleep(5000); for(inti=0; i<10; i++) { System.out.println("第"+i+"个消费者消费数量:"+consumers[i].getCount()); } threadPool.shutdown(); workerPool.halt(); }
计算机总共就4个核,所以只有4个消费者可以消费
最后
在LMAX交易平台上,其号称借助Disruptor每秒可以处理600万笔订单数据,总之性能非常强大。这里我们虽然没有测试其性能,但是通过案例可以看到其灵活性。
通过引入Disruptor,可以让核心链路更加健壮和可扩展。在真实的场景中核心的链路远比今天所举的例子复杂,但是相信你已经掌握了Disruptor的灵活使用,肯定可以应对的。
参考
注:
本文独家发布自金蝶云社区