(1)谷歌云monitoring服务里面创建自定义查询,发送到日志接收器,过滤gke日志如下:
(2)日志接收器选一个类似kafka的组件(pub/sub),然后针对那个topic创建消息订阅,再其创建日志触发器(python脚本),产生一条日志,就会实时发送到kakfa,然后被订阅者消费,同时触发脚本处理日志,筛选重要的推送钉钉。
这就是日志服务发送到kafka的日志,然后用脚本处理这条日志就可以,看谷歌云日志触发器语法,自己写脚本处理日志:
## ================================================== ## 让读书成为一种生活方式。就像吃喝拉撒每天必须要干的事, ## 终有一天你的举止、言谈、气质会不一样。 ## —- 5sdba ## ## Created Date: Saturday, 2021-07-08, 11:55:05 am ## copyright (c): SZWW Tech. LTD. ## Engineer: async ## Module Name: ## Revision: v0.01 ## Description: ## ## Revision History : ## Revision editor date Description ## v0.01 async 2021-07-08 File Created ## ================================================== import requests import json,hmac,base64,hashlib import urllib.parse import datetime,time import pytz tz = pytz.timezone(‘Asia/Shanghai‘) URL = "https://oapi.dingtalk.com/robot/send?access_token=cxxxxxx" secret = "xxxxxx" def get_timestamp_sign(): timestamp = str(round(time.time() * 1000)) secret_enc = secret.encode(‘utf-8‘) string_to_sign = ‘{}\n{}‘.format(timestamp, secret) string_to_sign_enc = string_to_sign.encode(‘utf-8‘) hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) return (timestamp, sign) def get_signed_url(): timestamp, sign = get_timestamp_sign() webhook = URL + "×tamp="+timestamp+"&sign="+sign return webhook def get_webhook(mode): if mode == 0: # only 敏感字 webhook = URL elif mode == 1 or mode ==2 : # 敏感字和加签 或 # 敏感字+加签+ip webhook = get_signed_url() else: webhook = "" print("error! mode: ",mode," webhook : ",webhook) return webhook def send_msg(event,context): webhook = get_webhook(1) str_log = base64.b64decode(event[‘data‘]).decode(‘utf-8‘) k8s_log=json.loads(str_log) print(k8s_log) ops_time = datetime.datetime.strptime(k8s_log["receiveTimestamp"].replace("‘", ‘‘).replace(‘T‘, ‘ ‘).split(‘.‘)[0],"%Y-%m-%d %H:%M:%S") + datetime.timedelta(hours=8) ops_cluster = k8s_log[‘resource‘][‘labels‘][‘cluster_name‘] # ops_pod = k8s_log[‘jsonPayload‘][‘metadata‘][‘namespace‘] ops_pod=k8s_log[‘jsonPayload‘][‘metadata‘][‘name‘].split(‘.‘)[0] ops_namespace=k8s_log[‘jsonPayload‘][‘metadata‘][‘namespace‘] ops_type = k8s_log[‘jsonPayload‘][‘metadata‘][‘managedFields‘][0][‘operation‘] ops_info=k8s_log[‘severity‘] if ‘WARNING‘ in ops_info: log_pri="<font color=#D3F545 size=3>warning</font>" elif ‘INFO‘ in ops_info: log_pri="<font color=#008000 size=3> info </font>" else: log_pri="<font color=#FF4109 size=3> sos </font>" ops_log = k8s_log[‘jsonPayload‘][‘message‘] if ops_type not in [‘Pulled‘, ‘Scheduled‘]: pagrem = { "msgtype": "markdown", "markdown": { "title": "K8s ops" + "....", "text": "<font color=#FF0000 size=3>操作详情:</font>" + "\n\n>日志时间 :" + str(ops_time) + "\n\n>集群名称:" + str(ops_cluster) + "\n\n>pod组名:" + str(ops_pod) + "\n\n>命名空间:" + str(ops_namespace) + "\n\n>日志级别:" + str(log_pri) + "\n\n>日志详情:" + str(ops_log) }, "at": { "atMobiles": [ "xxxxx" ] }, "isAtAll": "False" } headers = { ‘Content-Type‘: ‘application/json‘ } requests.post(url=webhook, data=json.dumps(pagrem), headers=headers) print(json.dumps(pagrem))
(3)最后完成的效果如下:
可以再优化一下脚本,做一些详细的判断,达到自己需要的目标。