#zk监控 import json from datetime import datetime, timedelta from kazoo.client import KazooClient from kazoo.exceptions import NoNodeError def canal_monitor(**kwargs): zk = KazooClient(hosts='host1:port,host2:port,host3:port') zk.start() instance_list = zk.get_children('/otter/canal/destinations') count = 0 err_count = 0 for instance in instance_list: if instance : count = count + 1 try: result = zk.get('/otter/canal/destinations/{instance}/1001/cursor'.format(instance=instance)) time_tuple = get_time(result) content = '{}.canal consume cursor:{},delay {} minutes'.format(instance, time_tuple[0].strftime('%Y-%m-%d %H:%M:%S'),time_tuple[1]) if time_tuple[1] > 60: err_count = err_count + 1 print(content) else: print(content) except NoNodeError as e: print(e) zk.stop() if err_count != 0: print('zookeeper监控完毕,当前订阅业务数{},{}延迟!'.format(str(count), str(err_count))) def get_time(result): result_str = str(result[0], encoding="utf-8") result_json = json.loads(result_str) result_ts = result_json['postion']['timestamp'] result_utc_dt = datetime.fromtimestamp(result_ts / 1000) result_dt = result_utc_dt + timedelta(hours=8) now = datetime.now() time_delay = now - result_utc_dt return tuple([result_dt, int(time_delay.seconds / 60)])