记录HR项目中使用zato对接魔学院培训系统消息转发到钉钉原创
金蝶云社区-kd程明
kd程明
3人赞赏了该文章 243次浏览 未经作者许可,禁止转载编辑于2023年07月12日 10:52:47

客户使用了魔学院、金蝶sHR,移动端使用的是钉钉,shr这边实现了和魔学院的组织人员对接、PC端单点登陆对接,没有实现消息对接,魔学院也不支持再将消息与第三方对接,需要人力系统想办法进行对接。

需求是期望培训提醒消息能推送到钉钉。刚好最近在研究zato,相关的对接逻辑通过研究钉钉和魔学院的接口文档,大致了解,感觉不难,实际操作的时候各种奇奇怪怪的问题。真的是看起来容易,做起来难,断断续续做了有几个月的时间,从zato安装、尝试写第一个service,部署centos环境,梳理对接逻辑,一点点编写对接代码,本地调试,服务器部署等等。个人对sql比较熟悉,python也是简单学过,过程中遇到的坑基本上都是网上检索到的。

进入正题:

实现效果:

3b0ba2d0b2e29c8e5ac00851298619b.jpg

zato安装不再赘述,网上很多教程,建议参考zato.io里面相关的介绍来做,会比较官方一些。

对接分为三个部分:

  1. 订阅魔学院消息。这部分主要过程为获取魔学院accesstoken,发送订阅请求,接收返回的加密数据,解密数据,检查是否为check_url事件,是,则返回加密的success,结束。

  2. 接收魔学院消息转发钉钉。这部分主要过程是被动接收魔学院发送来的消息,解密消息,根据消息内容获取用户手机号、姓名,存数据库,获取钉钉accesstoken,根据手机号获取用户钉钉id,发送消息后接收回执存库。

  3. 从钉钉消息单点登陆到魔学院。

代码主要分为两部分:魔学院部分和钉钉部分。

魔学院部分:

# -*- coding: utf-8 -*-
# zato: ide-deploy=True

# Zato
from zato.server.service import Service
from dingcrypt import DingCallbackCrypto3
from contextlib import closing
from zato.common.odb.api import SQLConnectionPool
from zato.common.util.api import get_engine_url,get_component_name
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from urllib.parse import urlparse,parse_qsl,quote,urlencode
import json
import sys
sys.path.append(r'/opt/zato/env/qs-1/server1/pickup/incoming/services')
import os
from accesstoken import accesstokencache
from zato.common import DATA_FORMAT
#print(Service)
# ##############################################################################

class GetMXYAccessToken(Service):
    """ Returns details of a user by the person's ID.
    """
    name = 'api.mxy.gettoken'

    def handle(self):
        params = {'corpid':'xxxx', 'corpsecret':'xxxx'}
        conn= self.out.rest['mxyAccessToken'].conn
        #获取请求参数
        rqparam=self.request.payload
        #先从缓存加载token数据
        json_dict=accesstokencache.get_value('mxy')
        #加载不成功或者token过期则刷新token
        if(json_dict==None or rqparam['movement']=='refresh'):
            response=conn.get(self.cid, params)
            tokenDict=response.data
            print(tokenDict)
            accesstokencache.set_value('mxy',tokenDict['results']['access_token'],tokenDict['results']['expires_in'])
            json_dict=accesstokencache.get_value('mxy')
        self.response.payload = json_dict


class HandleSubscribe(Service):
    name='api.mxy.handlesubscribe'
    def handle(self):
        conn=self.out.rest['mxySubscribe'].conn
        #self.logger.info(self.invoke('api.mxy.gettoken'))
        refresh_data={'movement':'refresh'}
        get_data={'movement':'get'}
        #内部调用获取token服务
        token=self.invoke('api.mxy.gettoken',get_data,DATA_FORMAT.JSON)
        params = {'access_token':token, 'channel':'message','url':'https://baidu.com'}
        response=conn.get(self.cid, params)
        result=response.data
        #返回错误码为401重新获取token
        if(result['errcode']==401):
            token=self.invoke('api.mxy.gettoken',refresh_data,DATA_FORMAT.JSON)
            params = {'access_token':token, 'channel':'message','url':'https://baidu.com'}
            result=conn.get(self.cid, params)

class MessageRecive(Service):
    
    name='api.mxy.messagereceive'
    def mxyMessegeInsertTB(self,param):
        existTemplate='''SELECT EXISTS (
        SELECT FROM 
        pg_tables
        WHERE 
        tablename  = 't_mxy_message'
        );'''
        tableTemplate='''create table t_mxy_message(
        id int not null PRIMARY KEY generated always as identity,
	    messagetext text,
        sender text,
        userid text,
        phonelist text,
        username text,
        title text,
        messagecontent text,
        contentType text,
        linkurlh5 text,
        dduserid text,
        easuserid text,
        ddtaskid text
        );'''
        insertTemplate='insert into t_mxy_message(messagetext,sender,userid,title,messagecontent,contentType,linkurlh5,phonelist,username) values(:messagetext,:sender,:userid,:title,:messagecontent,:contentType,:linkurlh5,:phonelist,:username) '
        parameters=param
        self.logger.info(parameters)
        dburl=get_engine_url(self.out.sql.get('shrDev').config)
        self.logger.info(dburl)
        self.logger.info(self.out.sql.get('shrDev').config)
        engine=create_engine(dburl)
        Session=sessionmaker(bind=engine)
        with closing(Session()) as session:
            if(False==session.execute(existTemplate).first()[0]):
                session.execute(tableTemplate)
                session.commit()
            session.execute(insertTemplate,parameters)
            session.commit()

    def mxyUserInfoGet(self,userid):
        conn=self.out.rest['mxyGetUserInfo'].conn
        refresh_data={'movement':'refresh'}
        get_data={'movement':'get'}
        #内部调用获取token服务
        token=self.invoke('api.mxy.gettoken',get_data,DATA_FORMAT.JSON)
        params={'access_token':token,'userid':userid}
        response=conn.get(self.cid,params)
        result=response.data
        if(result['errcode']==401):
            token=self.invoke('api.mxy.gettoken',refresh_data,DATA_FORMAT.JSON)
            response=conn.get(self.cid, params)
        result={'mobile':response.data['results']['mobile'],'name':response.data['results']['name']}

        return result
        
    def handle(self):
        mxycrypt=DingCallbackCrypto3("xxx", "xxx", "xxx",1)
        self.logger.info(self.request.payload)
        self.logger.info(self.request.payload.decode('utf-8'))
        rcData=dict(parse_qsl(self.request.payload.decode('utf-8')))
        self.logger.info(rcData)
        rcParam=self.request.http.params
        self.logger.info(rcData['encrypt'])
        self.logger.info(self.request.http.params)
        self.logger.info(rcParam.signature)
        self.logger.info(rcParam.timestamp)
        self.logger.info(rcParam.nonce)
        decryptResult=mxycrypt.getDecryptMsg(rcParam.signature,rcParam.timestamp,rcParam.nonce,rcData['encrypt'])
        self.logger.info(decryptResult)
        self.logger.info(isinstance(decryptResult,str))
        checkUrlDict='{"eventType":"check_url"}'
        
        if decryptResult==checkUrlDict:
            json_dict=mxycrypt.getEncryptedMap("success")
            self.response.payload=json_dict
            return
        jsonResult=json.loads(decryptResult)
        phoneList=[]
        nameList=[]
        for userid in jsonResult['userid']:
            phoneList.append(self.mxyUserInfoGet(userid)['mobile'])
            nameList.append(self.mxyUserInfoGet(userid)['name'])
        param={'messagetext':decryptResult,'sender':jsonResult['sender'],'userid':','.join(jsonResult['userid']),'title':jsonResult['title'],'messagecontent':jsonResult['content'],'contentType':jsonResult['contentType'],'linkurlh5':jsonResult['linkUrl']['h5'],'phonelist':','.join(phoneList),'username':','.join(nameList)}
        self.mxyMessegeInsertTB(param)
        self.invoke('api.dingtalk.sendmessage')

钉钉部分:

# -*- coding: utf-8 -*-
# zato: ide-deploy=True

# Zato
from zato.server.service import Service
from zato.common.util.api import get_engine_url
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from contextlib import closing
import time
import sys
sys.path.append(r'/opt/zato/env/qs-1/server1/pickup/incoming/services')
import os
from accesstoken import accesstokencache
from zato.common import DATA_FORMAT

# ##############################################################################


class GetAccessToken(Service):
    """ Returns details of a user by the person's ID.
    """
    name = 'api.dingtalk.gettoken'

    def handle(self):
        params = {'appKey':'xxxx', 'appSecret':'xxx'}
        conn= self.out.rest['ddAccessToken'].conn
        rqparam=self.request.payload
        #获取请求参数
        json_dict=accesstokencache.get_value('dd')
        #加载不成功或者token过期则刷新token
        if(json_dict==None or rqparam['movement']=='refresh'):
            response=conn.post(self.cid, params)
            tokenDict=response.data
            accesstokencache.set_value('dd',tokenDict['accessToken'],tokenDict['expireIn'])
            json_dict=accesstokencache.get_value('dd')
        self.response.payload = json_dict

class SendMessage(Service):
    name= 'api.dingtalk.sendmessage'
    

    def ddGetUserInfo(self,mobile):
        payload = {'mobile':mobile,'support_exclusive_account_search':'true'}
        refresh_data={'movement':'refresh'}
        get_data={'movement':'get'}
        tokendata =self.invoke('api.dingtalk.gettoken',get_data,DATA_FORMAT.JSON)
        params ={'access_token':tokendata}
        conn= self.out.rest['dingtalkGetbymobile'].conn
        response = conn.post(self.cid, payload,params)
        result=None
        if(response.data['errcode']==88):
            tokendata =self.invoke('api.dingtalk.gettoken',refresh_data,DATA_FORMAT.JSON)
            params ={'access_token':tokendata}
            response = conn.post(self.cid, payload,params)
        if(response.data['errcode']==0):
            result = response.data['result']['exclusive_account_userid_list']
        return result

    def handle(self):
        agentid='123'
        refresh_data={'movement':'refresh'}
        get_data={'movement':'get'}
        messageTemplate='select id,phonelist,title from t_mxy_message where ddtaskid is null'
        updateTemplate='update t_mxy_message set ddtaskid=:taskid,dduserid=:dduserid where id=:id '
        tokendata =self.invoke('api.dingtalk.gettoken',get_data,DATA_FORMAT.JSON)
        params ={'access_token':tokendata}
        dburl=get_engine_url(self.out.sql.get('db').config)
        engine=create_engine(dburl)
        Session=sessionmaker(bind=engine)
        conn= self.out.rest['dingtalkSendMessage'].conn
        
        with closing(Session()) as session:
            #返回结果为(True,)但不知为什么使用in,判断结果为Ture
            result=session.execute(messageTemplate)
            if(result==None):
                return
            for item in result:
                userList=[]
                for phone in item.phonelist.split(','):
                    self.logger.info(phone)
                    userid=self.ddGetUserInfo(phone)
                    if(userid!=None):                
                        userList+=userid
                userid_list=','.join(userList)
                payload={
                    'agent_id':agentid,
                    'userid_list':'123456',
                    'msg':{'msgtype':'text',
                     'text':{'content':item.title}

                }
                }
                response = conn.post(self.cid, payload,params)
                respResult=response.data
                if(respResult['errcode']==88):
                    tokendata =self.invoke('api.dingtalk.gettoken',refresh_data,DATA_FORMAT.JSON)
                    params ={'access_token':tokendata}
                    response = conn.post(self.cid, payload,params)
                    respResult=response.data
                if(respResult['errcode']==0):
                    task_id=respResult['task_id']
                    paramDict={'dduserid':userid_list,'taskid':task_id,'id':item.id}
                    session.execute(updateTemplate,paramDict)
                    session.commit()

个人评价,代码质量一般,只能算是个脚本的水平,不过基本上都是自己摸索着写的,参考了一些eas移动端对接的标品写法,借着这个机会,也把python又复习了一遍。

zato还是挺有意思的,比较专业的一个工具,用好了可以事半功倍。



赞 3