MQ基础组件的封装原创
金蝶云社区-艾贺521
艾贺521
10人赞赏了该文章 541次浏览 未经作者许可,禁止转载编辑于2019年02月27日 08:20:04
封面


这里主要说一些一线大厂的关于MQ的组件实现思路和架构设计方案,看下图:

可靠性组件是最重要的组件,因为任何的基础组件都要保证消息的可靠性传递。对于中间状态的消息,需要做一些补偿的处理。MessafeStoreService




需要实现功能点:(最基本的要求)

  • 首先要支持高性能的序列化,异步发送消息

  • 生产实例与消费实例连接缓存化池化,提升性能

  • 支持可靠性投递,100%不丢失,金融项目的一个大单操作,优先级也要设置的比较高

  • 消费端幂等操作,避免消费端重复操作

扩展的相关要求:

  • 支持迅速的把消息发出去,有些不需要保证100%可靠,比如日志,重点在于吞吐量

  • 支持延迟消息,延迟的把消息投递,用于某些延迟检查,服务限流场景

  • 支持事务消息,且100%可靠性投递,在金融类单笔金额数量大操作时有次场景,比如一笔100万的转账

  • 支持顺序消息,列入下单的复合性操作

  • 支持补偿,重试,以及快速订单问题,异常

  • 支持集群负载均衡,支持消息路由策略,路由到集群,路由到内部队列




消息的迅速发送

迅速是指不进行落库,不做可靠性保障,重点在于吞吐量。主要是非核心的数据,日志数据,或者统计分析数据

,直接将消息发送,也不需要消费者进行确认。



优点:高性能,吞吐量大

缺点:可靠性没有保障


确认消息发送

消息确认模式是不需要消费端进行的

  • Step1和Step2是同时的,业务数据和消息数据同时入库,进行持久化,如果无异常,下一步

  • Step3,把消息投递给Broker端

  • Step4 Broker,即Broker进行回复消息是否被Producer组件收到应答消息

  • Step5 收到Broker之后,把上次的消息查出来,再进行更新状态

  • Step6 如果出现问题的话,通过定时任务抓取一些中间状态的消息

  • Step7 然后进行重新发送,如果失败次数太多可以进行补偿


批量发送消息

批量消息是把消息放到一个集合里,统一进行提交,这种方案期望消息在一个会话里,比如统一提交到ThreadLocal里的集合,然后把当前线程内所有的消息集合一次性提交的。可以设计一个类,在Channel中一次发送就可以。这种方式也是希望消费者在消费的时候,可以进行批量化的消费,针对某一个原子业务的操作去处理,但是不保证可靠性,需要进行补偿机制。

不保证可靠性,可以保证原子性。



和确认消息发送只有Step1和Step2有点区别:

  • Step1, 首先业务入库,然后用Batch组件把消息进行组装为集合

  • Step2,把多条消息入库,入库为一条记录即可。剩余的即确认消息的那些步骤


延迟消息发送

在消息封装的时候加上delayTime即可。就是一个属性即可,然后用延迟插件。

业务场景:自动超时作废,优惠券过期,自动确认收货等


顺序消息发送

类似与批量消息,但是还有点不同,要保证以下几点:

  • 同一个队列

  • 只能有一个消费者

  • 然后统一提交(可以是一个大消息,也可以是多个消息),并且所有的消息会话ID保持一致。

  • 添加消息的属性,顺序标记序号,和本次顺序消息的Size属性,进行落库操作

  • 收到延迟消息之后,根据会话Id,Size抽取数据库进行处理

消费者获取的时候不用直接消费,而是先落库,然后延迟一会再去处理。

生产端如果没有完全的投递消息,6条消息只发了5条,那么必须要做补偿。

生产端:

  • 通过生产端顺序组件进行排序,入库发送


消费端:

  • Step1 首先把收到的消息入库

  • Step2 同时发送给自己延迟消息,消息为刚才收到的

  • Step3 几分钟之后,刚才发送的延迟消息也收到了

  • Step4 执行生产端的业务操作

  • Step5 如果失败补偿,还是如何处理还是需要一个定时任务


事务消息发送

使用中比较少见,但是如果非常大的单笔交易金额的时候,这个时候消息的优先级肯定是最高,并且可靠性要达到100%的,自己的系统和银行系统都要这样,并且也会有一些补偿机制,主动的向银行发起指令。半小时之内一定要有个结果,如果10分钟还没有结果,一般要找运维及时知道结果。


保障性能同时,还要支持事务。使用RabbitMQ事务和Spring集成机制,在进行压测的情况时候,效果并不理想。


方案:

  • 可靠性投递,也需要做一些补偿处理。业务操作数据库DB1和消息记录DB2数据库使用同一个数据源

  • 重写DatasourceTransactionManager,(如果事务提交成功,但是消息发送失败,这个时候就需要补偿了)


与可靠性消息,确认消息类似,但是要保证业务数据库与消息数据层面要一致。(消息数据库与业务数据库同时成功,或者同时失败)


消息的幂等性保障

使用MQ的过程中至关重要的一个环节。

原因:

  • 可靠性投递的时候,比如ack的时候网断了,重新发送消息的时候又发出去了

  • MQ的Broker服务与消费端传输消息的时候网络抖动

  • 消费端故障或者异常



统一Id生成服务:同一个消息的ID保证是唯一的,并且每次生成的ID值是相同的,在进行消息入库的时候,利用数据库的唯一索引,同一ID只能插入一次。

本地ID生成服务:全局的降级策略,如果全局的id一直没有生成成功,那么用本地的ID生成服务



最后

主要就是说了下大厂中一些消息组件的方案和思路,希望能开阔大家思路,能在实际工作中更好的使用。



注:

本文独家发布自金蝶云社区


赞 10