背景:
获取一些信息时常要去看手机,很不方便,便希望推送消息到钉钉查看。
整体思路和操作都不难,但中间仍有好多坑… 有点时间不写python了做个笔记熟悉下…
思路分解:
- 先实现推送成功;
- 获取要推送的内容;
- 实现定时调度;
具体操作
1. 实现推送成功
-
获取钉钉的access_token 、自定义关键词、密钥(如果有需要的话),添加好钉钉机器人,这个比较简单就不多赘述了。
-
推送操作
推送获取的 access_token 、自定义关键词、密钥不一定要全部添加,一般自己看的内容加关键词即可(钉钉的安全升级新版,旧版的关键词都可不加); -
拼接方式:
webhook = 'https://oapi.dingtalk.com/robot/send?access_token=access_token值' + "×tamp="+ timestamp + "&sign=" + sign
- 具体代码及实现:
import time
import hmac
import hashlib,base64 # 加解密
import urllib.parse
import requests,json
# @加签 密钥
# @安全方式使用加签的方式:timestamp+"\n"+密钥当做签名字符串,用HmacSHA256算法计算签名,进行Base64 encode,最后把签名参数再进行urlEncode,得到最终的签名(需要使用UTF-8字符集);
# 这一段主要实现加签的密钥处理
timestamp = str(round(time.time() * 1000))
secret = 'SECfedb5d37632e3b25fc751bc33ffd9f3ce88f29be554f5486f0e05f4166f85e5c'
secret_enc = secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote(base64.b64encode(hmac_code))
print(timestamp)
print(sign) # 可通过查看这里的sign查看密钥
# 定义数据类型
headers = {'Content-Type': 'application/json'}
# 初始化钉钉内容
webhook = 'https://oapi.dingtalk.com/robot/send?access_token=64e7627e740e0c00882237b482dc5e8421e48105ffb9aa4854c88218526d29cf' + "×tamp="+ timestamp + "&sign=" + sign # 加密钥写法
# 定义要发送的数据
# "at": {"atMobiles": "['"+ mobile + "']"
data = { "msgtype": "text",
"text": {"content": '猪猪发功' }, # 自定义词
"isAtAll": True} # @all 貌似没啥用
# 发送post请求到钉钉
res = requests.post(webhook, data=json.dumps(data), headers=headers)
print(res.text)
当然这只是最基础的文本推送,推送还可有非常多的种类,包含图片、图文、卡片等等,详情可参考此链接。
2.获取推送内容
这里以获取股票、基金为主,本质都是接口or爬虫信息获取渠道和方式的差异;
获取方式有多种,比如tushare包、akshare包、爬虫、一系列的金融平台接口等,这里主要讲tushare包;
import requests,json,time,datetime,tushare
from dingtalkchatbot.chatbot import DingtalkChatbot, CardItem, ActionCard
# @获取基金信息
class Jijin():
def __init__(self,code):
self.jijin_code = code
def get_jijin(self):
url = 'http://fundgz.1234567.com.cn/js/'+self.jijin_code+'.js'
res = requests.get(url)
res.encoding = res.apparent_encoding
html = res.text
name = html.split('name":"')[1].split('",')[0]
now_time = html.split('gztime":"')[1].split('"})')[0]
updown = html.split('gszzl":"')[1].split('",')[0]
info = '基金名称:' + name + " 时间:" + now_time + ' 涨跌:' + updown
return info
# @获取股票信息
class Gupiao():
def __init__(self,code):
self.share_code = code
def get_share(self):
data = tushare.get_realtime_quotes(self.share_code)
# code = data.loc[0]['code']
name = data.loc[0][0]
open = str(data.loc[0][1]) # 开盘价
price = str(data.loc[0][3]) # 目前市场价
high = str(data.loc[0][4]) # 最高价
low = str(data.loc[0][5]) # 最低价
message = '股票编号:{},股票名称:{},当前价格{},今日开盘价{},今日最高价{},今日最低价{}'.format(self.share_code,name,price,open,high,low)
return message
# @获取基金/股票信息 选取了加注的几个
funds = ['159992','515170','005827']
funds_new = ''
for fund in funds:
fund_mes = Jijin(fund).get_jijin()
# funds_new.append(mes+'\n')
funds_new += fund_mes+'\n'
stocks = ['688365','600336']
stocks_new = ''
for stock in stocks:
stock_mes = Gupiao(stock).get_share()
stocks_new += '{} {}'.format(stock_mes, '\n')
# @钉钉推送 除了上面代码写的 res发post请求外,还有现成的包可用如下
from dingtalkchatbot.chatbot import DingtalkChatbot, CardItem, ActionCard
# 初始化钉钉信息 如果之前写过的可不写,调用包就无需拼接webhook串了
webhook = 'https://oapi.dingtalk.com/robot/send?access_token=xxxxxx'
secret = 'xxxxx'
dingding = DingtalkChatbot(webhook, secret=secret, pc_slide=True, fail_notice=True)
# 最后一切就绪,直接调用推送;
dingding.send_text(msg='\n猪猪发功: 恭喜nannan老板钻大钱