场景
我们在使用maxcompute进行数据处理,我们会需要对上下游的数据质量作一些规则校验,用以来识别是否有脏数据影响了我们的数据质量。
而校验后,若是发现了脏数据,就需要第一时间通知到数据负责人,以便于数据质量负责人/数据运维角色的人员可以及时得知问题,让他们有充分的数据对数据进行处理,尽快修正数据问题;而且告知信息需要的是准确,有含义,才能让人更快定位原因。
而监控数据并通知到数据质量负责人/数据运维的方式有很多,常见的有使用外置数据质量工具,但是工具类天生就很重,开发难度,后去运维对于小团队而言是1个很不理想的方案;或者可以使用阿里云的DQC质量工具,但是该工具无法自定义消息内容,告知信息不友好等问题。
上面2种方式都有各自的问题,那么我们就只能自己造1个小而轻的*,这个*的实现原理是使用 odps SQL + pyodps + 钉钉群 ,其中 ODPS SQL 负责数据质量校验并生成错误消息,PYODPS 负责把消息推送到对应的钉钉群中。
应该准备什么
1.打通maxcompute的钉钉白名单
在阿里云maxcompute管理控制台-maxcompute项目-选中“项目配置”
在工作空间配置中,配置钉钉OPEN API的链接(oapi.dingtalk.com)和端口(80/443)
2.配置钉钉机器人
群机器人是钉钉群的高级扩展功能。群机器人可以将第三方服务的信息聚合到群聊中,实现自动化的信息同步。目前,大部分机器人在被添加后,还需要进行Webhook配置,才可正常使用。
另外,群机器人支持Webhook协议的自定义接入,例如防疫精灵机器人可以为企业提供新型冠状病毒疫情实况推送、预防措施自动问答服务,帮助员工在新冠疫情期间获取最新疫情、最新权威预防措施。群成员可以直接@防疫精灵机器人进行疫情实况和预防措施问答,机器人也会定时推送最新疫情给群成员。
钉钉群机器人致力于把钉钉群聊的消息输出到外部,以及接收外部的消息输入。
配置方式
1)钉钉群-智能群助手-添加机器人-自定义机器人
,输入机器人名字并选择要发送消息的群。如果需要的话,可以为机器人设置一个头像。点击“完成”,完成后会生成Hook地址,为了安全性,可以选择“加签”获取密文
2)复制机器人的webhook地址和签名
https://oapi.dingtalk.com/robot/send?access_token=c58321ceae9b13cb64a0924636d3d1de1fc409760c2f3a21b9a4bc3c47b12be7
钉钉机器人简介
(1)获取到Webhook地址后,用户可以向这个地址发起HTTP POST 请求,即可实现给该钉钉群发送消息。注意,发起POST请求时,必须将字符集编码设置成UTF-8。
(2)当前自定义机器人支持文本 (text)、链接 (link)、markdown(markdown)、ActionCard、FeedCard消息类型,大家可以根据自己的使用场景选择合适的消息类型,达到最好的展示样式。
(3)自定义机器人发送消息时,可以通过手机号码指定“被@人列表”。在“被@人列表”里面的人员收到该消息时,会有@消息提醒(免打扰会话仍然通知提醒,首屏出现“有人@你”)。
(4)当前机器人尚不支持应答机制 (该机制指的是群里成员在聊天@机器人的时候,钉钉回调指定的服务地址,即Outgoing机器人)。
(5)每个机器人每分钟最多发送20条。消息发送太频繁会严重影响群成员的使用体验,大量发消息的场景 (譬如系统监控报警) 可以将这些信息进行整合,通过markdown消息以摘要的形式发送到群里。
好了,到这里基本的提前准备工作已经告一段落,下面进入实现环节。
实现路径
# -*- coding: utf-8 -*-
import datetime
import sys
import time
import hmac
import hashlib
import base64
import urllib
import urllib2
import json
reload(sys)
sys.setdefaultencoding('utf8')
###webhook地址和签名
webhook = '''
https://oapi.dingtalk.com/robot/send?access_token=c58321ceae9b13cb64a0924636d3d1de1fc409760c2f3a21b9a4bc3c47b12be7
'''
secretkey = '*******输入签名****'
###消息
msg = '测试'
###日期时间
now = datetime.datetime.now()
sub = datetime.timedelta(days=-1)
now = now + sub
now_str = now.strftime('%Y-%m-%d')
timestamp = long(round(time.time() * 1000))
secret_enc = bytes(secretkey).encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secretkey)
string_to_sign_enc = bytes(string_to_sign).encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.quote_plus(base64.b64encode(hmac_code))
webhook = webhook.strip()
webhook = '%s×tamp=%d&sign=%s' % (webhook, timestamp, sign)
param = {
"msgtype": "text",
"text": {
"content": msg
}
}
data = json.dumps(param)
###发送消息
request = urllib2.Request(webhook, data, {'Content-Type': 'application/json;charset=utf-8'})
response = urllib2.urlopen(request)
result = 'code:%d,msg:%s' % (response.getcode(), json.dumps(response.readlines()[0]))
print result
结果