文本主要介绍了MQ(消息队列)的调用模型及其工作原理,包括MQ服务启动时如何根据配置文件初始化队列信息、启动消费者线程并订阅队列主题,业务服务如何发送消息到MQ,以及消费者线程如何消费消息。同时,文本还讨论了队列阻塞的可能原因及解决方式,包括检查MQ运行状态、vhost设置、二开队列配置和消费者插件部署等,以及常见的MQ使用问题和调试方法。
标签:
MQ、消息发送、消息队列
MQ调用模型示意图
图解说明:
Ⅰ.首先MQService启动时会根据队列信息配置文件(如:1.平台的/kd/bos/mq/mqqueueconfig.xml,2.各自业务的3.二开的)初始化队列信息,启动消费者线程,并订阅相关队列主题
Ⅱ.当业务服务产生消息时会发送到mq组件中(如单据提交到工作流)
Ⅲ.订阅方线程消费相关消息
注意:
如果发现你的队列阻塞了,有可能是MQ应用服务初始化消费者线程的时候失败了。这时你需要注意一下以下过程:
首先我们看一个队列配置信息的示例
<region name="workflow" appid="wf"> <queue name="messagecenter_service"> <consumer class="kd.bos.workflow.message.service.impl.MessageCenterConsumer"></consumer> </queue> </region>
在这个配置文件中,region配置了一个appid=wf(粒度可以细化到queue一级),appid决定了这个消费者线程会在哪一个应用中去启动
例如:当前节点采用分应用部署的模式,且系统变量appids中没有wf,那么当前节点就不会去消费该队列的消息
常见问题
1)轻量级环境,工作流提交后收不到消息
System.setProperty("dubbo.registry.register", "true");
System.setProperty("mq.consumer.register", "true");//该参数为false,本节点将不会消费mq消息
2)团队版开发环境,工作流提交后偶发收不到消息
System.setProperty("mq.debug.queue.tag", "每个人的专属tag");
3)生产环境工作流提交后收不到消息,但开发环境可以
检查生产环境是否分应用部署的,bos应用下的appids要加上wf
4) 以上方式都不能解决怎么办
①检查mq是否正常运行 如 telnet xxx.xxx 5672
②测试环境和其它环境共用vhost,导致测试环境的消息被其它环境消费,此时需要检查所有环境的vhost设置,保证每个环境使用独立的vhost(一般可用集群名)
5)二开队列的配置文件怎么部署?消费者插件怎么部署?
①首先找到二开队列配置文件里面的appid所在的服务节点及插件工程
②消费者插件应该部署在此工程下
③同时在消息发送方/消费方 工程下,添加二开队列的配置文件
④同时在消息发送方/消费方 服务节点处, 增加启动参数mqConfigFiles.config=配置文件的相对路径
注:非分应用部署则appid为空,配置文件及配置参数配置任意一方即可
常用调试入口
1)MQ应用服务启动时是否正常
kd.bos.mq.init.MQInit.init()
kd.bos.mq.config.UsageConfig.get()//可调试队列信息配置文件是否正常加载
2)调试消息中心发送消息
调用代码:kd.bos.servicehelper.workflow.MessageCenterServiceHelper.sendMessage(MessageInfo)
消费方:kd.bos.workflow.message.service.impl.MessageCenterConsumer
查看消息是否正常发送:select * from t_wf_msgfail,关键字段【retry:重发次数,state:消息状态】
state枚举值:【normal:业务堆积,success:推送成功,fail:推送失败,intervene:业务干预】
3)查看队列是否阻塞
登录monitor查看
COSMIC_MQ服务调用模型.pdf(758.39KB)
推荐阅读