现在终于开始学习salt的api了,有些小激动啊,我们执行命令的时候,后台到底是如何处理的,发生什么了事情,我对着一切有着强烈的好奇心啊。
这些是saltstack命令对应的api:
salt --->salt.client.LocalClient salt-cp --->salt.cli.cp.SaltCP salt-key --->salt.key.KeyCLI salt-call --->salt.cli.caller.caller salt-run --->salt.runner.Runner salt-ssh --->salt.client.ssh.SSH
首先学习的是salt命令对应的api:salt.client.LocalClient。
首先我们以一个简单的命令去讲解salt是如何处理。
salt ‘*‘ test.ping
这个首先会调用salt.cli.SaltCMD类实例化一个对象
主要代码如下:salt/cli/__init__.py
class SaltCMD(parsers.SaltCMDOptionParser): def run(self): ... # 设置日志的 # 调用salt.client.LocalClient api try: local = salt.client.LocalClient(self.get_config_file_path()) except SaltClientError as exc: self.exit(2, ‘{0}\n‘.format(exc)) return # self.options是命令行参数的集合 # 通过optparse模块解析命令行参数 # 批量执行的,salt -b if self.options.batch: batch = salt.cli.batch.Batch(self.config) # Printing the output is already taken care of in run() itself for res in batch.run(): pass else: # 处理需要传递的参数,kwargs是参数字典,传递时使用**kwargs .... # 异步执行,不等待结果返回 if self.config[‘async‘]: jid = local.cmd_async(**kwargs) # 返回jid print(‘Executed command with job ID: {0}‘.format(jid)) return try: # 接下来是主要逻辑 # 默认调用cmd_cli方法返回生成器,循环输出结果 # 在cmd_cli里有个get_cli_event_returns方法会去调用gather_job_info # gather_job_info是去触发saltutil.find_job任务 # 是去验证操作是否还在执行 if local: if self.options.subset: # 选择几个Minion调用cmd_cli cmd_func = local.cmd_subset kwargs[‘sub‘] = self.options.subset kwargs[‘cli‘] = True else: cmd_func = local.cmd_cli if self.options.static: if self.options.verbose: kwargs[‘verbose‘] = True full_ret = local.cmd_full_return(**kwargs) ret, out = self._format_ret(full_ret) self._output_ret(ret, out) elif self.config[‘fun‘] == ‘sys.doc‘: ret = {} out = ‘‘ for full_ret in local.cmd_cli(**kwargs): ret_, out = self._format_ret(full_ret) ret.update(ret_) self._output_ret(ret, out) else: if self.options.verbose: kwargs[‘verbose‘] = True for full_ret in cmd_func(**kwargs): ret, out = self._format_ret(full_ret) self._output_ret(ret, out)
从上面代码片段我们知道SaltCMD主要是解析命令行参数,通过参数来决定调用salt.client.LocalClient的哪个方法,默认是调用cmd_cli方法。
那么真正处理操作的是LocalClient类,LocalClient主要包含的方法是cmd_*,get_*_returns,gather_job_info,run_job,pub。
首先学习下它怎么使用吧。使用的是ipython学习的
:import salt.client # 初始化实例,参数必须是master的配置文件路径 :local = salt.client.LocalClient(‘/opt/app/salt/etc/master‘) # cmd(tgt,fun,arg,expr_form=‘glob‘,...) :local.cmd(‘192.168.110.132‘,‘test.ping‘) #返回的结果 : {‘192.168.110.132‘: True} # local.cmd_async(‘192.168.110.132‘,‘test.ping‘) # 返回结果是个jid : ‘20140813164423244819‘
cmd代码片段如下:
def cmd( self, tgt, # salt target ‘192.168.110.132‘ ‘os:Linux‘ fun, # 执行模块 test.ping arg=(), # 模块参数 timeout=None, expr_form=‘glob‘, # target的类型, ret=‘‘, # returners kwarg=None, **kwargs): arg = condition_kwarg(arg, kwarg) # 将参数合并成一个列表 # run_job返回的结果形式: #{‘jid‘: ‘20131219215650131543‘, ‘minions‘: [‘jerry‘,‘sdfsd‘,]} pub_data = self.run_job(tgt, fun, arg, expr_form, ret, timeout, **kwargs) if not pub_data: return pub_data # 需要等待timeout时间才会返回结果 return self.get_returns(pub_data[‘jid‘], pub_data[‘minions‘], self._get_timeout(timeout))
其他的cmd_*形式都相似的,都会调用run_job方法,只不过返回结果不一样而已,比如cmd_async是异步执行的,不用等待,直接返回一个jid。cmd_cli返回的是生成器并会调用
gather_job_info方法去触发saltutil.find_job任务。cmd_cli是默认的处理方法。
run_job代码如下:
# run_job返回的结果形式: # {‘jid‘: ‘20131219215650131543‘, ‘minions‘: [‘jerry‘,‘sdfsd‘,]} def run_job( self, tgt, fun, arg=(), expr_form=‘glob‘, ret=‘‘, timeout=None, kwarg=None, **kwargs): arg = condition_kwarg(arg, kwarg) jid = ‘‘ # 这个可以自定义jid # Subscribe to all events and subscribe as early as possible self.event.subscribe(jid) # 订阅event pub_data = self.pub( tgt, fun, arg, expr_form, ret, jid=jid, timeout=self._get_timeout(timeout), **kwargs) return self._check_pub_data(pub_data)
pub代码片段如下:
#payload_kwargs信息如下: { ‘cmd‘: ‘publish‘, ‘tgt‘: tgt, ‘fun‘: fun, ‘arg‘: arg, ‘key‘: self.key, ‘tgt_type‘: expr_form, ‘ret‘: ret, ‘jid‘: jid, ‘user‘:‘root‘ } master_uri = ‘tcp://‘ + salt.utils.ip_bracket(self.opts[‘interface‘]) + ‘:‘ + str(self.opts[‘ret_port‘]) sreq = salt.transport.Channel.factory(self.opts, crypt=‘clear‘, master_uri=master_uri) # 使用zeromq通信,使用的是rep/req socket # payload信息 #{‘enc‘: ‘clear‘, # ‘load‘: {‘jid‘: ‘20140807004713142582‘, # ‘minions‘: [‘192.168.79.47‘, ‘192.168.110.132‘, ‘192.168.110.133‘]}} try: payload = sreq.send(payload_kwargs) except SaltReqTimeoutError: log.error( ‘Salt request timed out. If this error persists, ‘ ‘worker_threads may need to be increased.‘ ) return {}
讲了这么多,现在总结下流程。
salt ‘*‘ test.ping的流程,类似于卸妆的过程,看到最后的面目。
1、调用SaltCMD处理命令行参数,决定调用LocalClient的哪个方法(默认是cmd_cli)
2、调用LocalClient处理
3、默认调用cmd_cli方法处理
4、调用run_job方法处理
5、调用pub方法处理
6、使用zmq req/req socket通信
7、调用gather_job_info触发saltutil.find_job任务
8、一层层返回结果
从这个流程可知,其实最终调用的就是pub方法。
local.cmd_async(‘192.168.110.132‘,‘test.ping‘) local.pub(‘192.168.110.132‘,‘test.ping‘)
salt命令行的API的讲解就到这里。
本文出自 “fly天地” 博客,请务必保留此出处http://liuping0906.blog.51cto.com/2516248/1539978