supervisor的event机制其实,就是一个监控/通知的框架。抛开这个机制实现的过程来说的话,event其实就是一串数据,这串数据里面有head和body两部分。咱们先弄清楚event数据结构,咱们才能做后续的处理。先看看header长啥样的吧
1
|
ver: 3.0 server:supervisor serial: 21 pool:listener poolserial: 10 eventname:PROCESS_COMMUNICATION_STDOUT len : 54
|
来说说上面的这个header每一项,都是什么?
ver:表示event协议的版本,目前是3.0
server:表示supervisor的标识符,也就是咱们上一篇中[supervisord]块中的identifier选项中的东西
默认为supervisor
serial:这个东西是每个event的序列号,supervisord在运行过程中,发送的第一个event的序列号就是
1,接下来的event依次类推
pool:这个是你的listener的pool的名字,一般你的listener只启动一个进程的的话,其实也就没有 pool的概念了。名字就是[eventlistener:theeventlistenername]这个东西
poolserial:上面的serial是supervisord给每个event的编号。 而poolserial则是,eventpool给发送
到我这个pool过来的event编的号
eventname:这个是event的类型名称,这个后面说。
len:这个长度,表示的是header后面的body部分的长度。header之后,我们会取len长度的内容作为 body。
好,说完了header,咱们就该说说body部分的数据结构了。body的数据结构,其实是和event的具体类型相关的,不同的event的类型,header的结构都一样,但是body的结构大多就不一样了。
关于event类型,咱们就不展开说了,因为太多了,具体大伙可以去参阅一下官网。其实搞会一个,其他也都一个样。
咱们这里说说待会一个要用到的类型就OK了,啥类型呢?
是PROCESS_STATE_EXITED
看着这名字,大伙差不多也就知道它是干什么的了。PROCESS_STATE_EXITED其实就是,当supervisord管理的子进程退出的时候,supervisord就会产生PROCESS_STATE_EXITED这么个event。
来看看PROCESS_STATE_EXITED长啥样吧,header咱们前面说过了,都一样。来看看body部分
1
|
processname:cat groupname:cat from_state:RUNNING expected: 0 pid: 2766
|
来说说具体含义
processname:就是进程名字,这里名字不是我们实际进程的名字,而是咱们[program:x]配置成的名字
groupname:组名,这个一个样
from_state:这个是,我们的进程退出前的状态是什么状态
expected:这个咱们前面也讲过,默认情况下exitcodes是0和2,也就是说0和2是expected。其它的退出
码,也就是unexpected了
pid:这个大伙想必都知道。
OK,说到了这里,我们知道了event的产生,然后给我们的listener这么一种结构的数据。
现在我们有数据了,就看咱们怎么去处理这些数据了,这个过程就仁者见仁,智者见智了。我们可以利用接收的数据,加工后,进行报警,等等操作。
处理数据之前,咱们还得要来了解一下,listener和supervisord之间的通信过程
在这里我们首先要搞清楚,event的发起方和接收方。
event的发起方是supervisord进程,接收方是一个叫listener的东西,listener怎么配置,上一篇参数详解里面已经写的很清楚了,大伙可以去参考下,这里就不赘述了。其实listener和program一样,都是supervisord的子进程。两者的在配置上,很多选项也都一样。
其实,event还有另外一个过程,我们的program也就是我们要管理的进程,也可以发送event,进而和supervisord主动通信。不过program程序一般都是程序员们搞,咱们搞运维的就不管他们的事情了
OK,看看event协议。
协议其实很简单。
当supervisord启动的时候,如果我们的listener配置为autostart=true的话,listener就会作为supervisor的子进程被启动。
listener被启动之后,会向自己的stdout写一个"READY"的消息,此时父进程也就是supervisord读取到这条消息后,会认为listener处于就绪状态。
listener处于就绪状态后,当supervisord产生的event在listener的配置的可接受的events中时,supervisord就会把该event发送给该listener。
listener接收到event后,我们就可以根据event的head,body里面的数据,做一些列的处理了。我们根据event的内容,判断,提取,报警等等操作。
该干的活都干完之后,listener需要向自己的stdout写一个消息"RESULT\nOK",supervisord接受到这条消息后。就知道listener处理event完毕了。
好,来看看例子吧
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
#!/usr/bin/env python #coding:utf-8 import sys
import os
import subprocess
#childutils这个模块是supervisor的一个模型,可以方便我们处理event消息。。。当然我们也可以自己按照协议,用任何语言来写listener,只不过用childutils更加简便罢了 from supervisor import childutils
from optparse import OptionParser
import socket
import fcntl
import struct
__doc__ = "\033[32m%s,捕获PROCESS_STATE_EXITED事件类型,当异常退出时触发报警\033[0m" % sys.argv[ 0 ]
def write_stdout(s):
sys.stdout.write(s)
sys.stdout.flush()
#定义异常,没啥大用其实 class CallError(Exception):
def __init__( self ,value):
self .value = value
def __str__( self ):
return repr ( self .value)
#定义处理event的类 class ProcessesMonitor():
def __init__( self ):
self .stdin = sys.stdin
self .stdout = sys.stdout
def runforever( self ):
#定义一个无限循环,可以循环处理event,当然也可以不用循环,把listener的autorestart#配置为true,处理完一次event就让该listener退出,然后supervisord重启该listener,这样listen#er就可以处理新的event了
while 1 :
#下面这个东西,是向stdout发送"READY",然后就阻塞在这里,一直等到有event发过来
#headers,payload分别是接收到的header和body的内容
headers, payload = childutils.listener.wait( self .stdin, self .stdout)
#判断event是否是咱们需要的,不是的话,向stdout写入"RESULT\NOK",并跳过当前
#循环的剩余部分
if not headers[ 'eventname' ] = = 'PROCESS_STATE_EXITED' :
childutils.listener.ok( self .stdout)
continue
pheaders,pdata = childutils.eventdata(payload + '\n' )
#判读event是否是expected是否是expected的,expected的话为1,否则为0
#这里的判断是过滤掉expected的event
if int (pheaders[ 'expected' ]):
childutils.listener.ok( self .stdout)
continue
ip = self .get_ip( 'eth0' )
#构造报警信息结构
msg = "[Host:%s][Process:%s][pid:%s][exited unexpectedly fromstate:%s]" % (ip,pheaders[ 'processname' ],pheaders[ 'pid' ],pheaders[ 'from_state' ])
#调用报警接口,这个接口是我们公司自己开发的,大伙不能用的,要换成自己的接口
subprocess.call( "/usr/local/bin/alert.py -m '%s'" % msg,shell = True )
#stdout写入"RESULT\nOK",并进入下一次循环
childutils.listener.ok( self .stdout)
'''def check_user(self):
userName = os.environ['USER']
if userName != 'root':
try:
raise MyError('must be run by root!')
except MyError as e:
write_stderr( "Error occurred,value:%s\n" % e.value)
sys.exit(255)'''
def get_ip( self ,ifname):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
inet = fcntl.ioctl(s.fileno(), 0x8915 , struct.pack( '256s' , ifname[: 15 ]))
ret = socket.inet_ntoa(inet[ 20 : 24 ])
return ret
def main():
parser = OptionParser()
if len (sys.argv) = = 2 :
if sys.argv[ 1 ] = = '-h' or sys.argv[ 1 ] = = '--help' :
print __doc__
sys.exit( 0 )
#(options, args) = parser.parse_args()
#下面这个,表示只有supervisord才能调用该listener,否则退出
if not 'SUPERVISOR_SERVER_URL' in os.environ:
try :
raise CallError( "%s must be run as a supervisor event" % sys.argv[ 0 ])
except CallError as e:
write_stderr( "Error occurred,value: %s\n" % e.value)
return
prog = ProcessesMonitor()
prog.runforever()
if __name__ = = '__main__' :
main()
|
差不多就这些了,其他常用的event类型,已经listener的三种状态,已经怎么转换的。大伙可以去官网上看看