MQ服务调用模型原创
金蝶云社区-sharkv
sharkv
9人赞赏了该文章 2,425次浏览 未经作者许可,禁止转载编辑于2021年11月01日 17:34:08
summary-icon摘要由AI智能服务提供

文本主要介绍了MQ(消息队列)的调用模型及其工作原理,包括MQ服务启动时如何根据配置文件初始化队列信息、启动消费者线程并订阅队列主题,业务服务如何发送消息到MQ,以及消费者线程如何消费消息。同时,文本还讨论了队列阻塞的可能原因及解决方式,包括检查MQ运行状态、vhost设置、二开队列配置和消费者插件部署等,以及常见的MQ使用问题和调试方法。

标签:

MQ、消息发送、消息队列

MQ调用模型示意图

image.png


图解说明:

Ⅰ.首先MQService启动时会根据队列信息配置文件(如:1.平台的/kd/bos/mq/mqqueueconfig.xml,2.各自业务的3.二开的)初始化队列信息,启动消费者线程,并订阅相关队列主题

Ⅱ.当业务服务产生消息时会发送到mq组件中(如单据提交到工作流)

Ⅲ.订阅方线程消费相关消息

 

注意:

如果发现你的队列阻塞了,有可能是MQ应用服务初始化消费者线程的时候失败了。这时你需要注意一下以下过程:

首先我们看一个队列配置信息的示例

<region name="workflow" appid="wf">
    <queue name="messagecenter_service">
        <consumer class="kd.bos.workflow.message.service.impl.MessageCenterConsumer"></consumer>
    </queue>
</region>

在这个配置文件中,region配置了一个appid=wf(粒度可以细化到queue一级),appid决定了这个消费者线程会在哪一个应用中去启动

例如:当前节点采用分应用部署的模式,且系统变量appids没有wf,那么当前节点就不会去消费该队列的消息

 

常见问题

1)轻量级环境,工作流提交后收不到消息

System.setProperty("dubbo.registry.register", "true");

System.setProperty("mq.consumer.register", "true");//该参数为false,本节点将不会消费mq消息

 

2)团队版开发环境,工作流提交后偶发收不到消息

System.setProperty("mq.debug.queue.tag", "每个人的专属tag");

 

3)生产环境工作流提交后收不到消息,但开发环境可以

检查生产环境是否分应用部署的,bos应用下的appids要加上wf

 

4) 以上方式都不能解决怎么办

①检查mq是否正常运行 如 telnet xxx.xxx 5672

②测试环境和其它环境共用vhost,导致测试环境的消息被其它环境消费,此时需要检查所有环境的vhost设置,保证每个环境使用独立的vhost(一般可用集群名)

 

5)二开队列的配置文件怎么部署?消费者插件怎么部署?

①首先找到二开队列配置文件里面的appid所在的服务节点及插件工程

②消费者插件应该部署在此工程下

③同时在消息发送方/消费方 工程下,添加二开队列的配置文件

④同时在消息发送方/消费方 服务节点处, 增加启动参数mqConfigFiles.config=配置文件的相对路径

注:非分应用部署则appid为空,配置文件及配置参数配置任意一方即可

常用调试入口

1)MQ应用服务启动时是否正常

kd.bos.mq.init.MQInit.init()

kd.bos.mq.config.UsageConfig.get()//可调试队列信息配置文件是否正常加载

2)调试消息中心发送消息

调用代码:kd.bos.servicehelper.workflow.MessageCenterServiceHelper.sendMessage(MessageInfo)

消费方:kd.bos.workflow.message.service.impl.MessageCenterConsumer

查看消息是否正常发送:select * from t_wf_msgfail,关键字段【retry:重发次数,state:消息状态】

state枚举值:【normal:业务堆积,success:推送成功,fail:推送失败,intervene:业务干预】

1.png


3)查看队列是否阻塞

登录monitor查看


 

2.png

    

 


赞 9