Ansible API

一、环境说明

当前环境说明:

  • python3.8.6
  • ansible2.10.5 

 

ansible版本差异说明:

  • ansible2.0版本前后有较大改变,鉴于当前已经到了2.10版本,不再过多说明历史2.0版本的变动。可参考文章:链接
  • ansible2.4,对于Inventory-->InventoryManager  VariableManager类的使用,导入方式进行了改变。可参考文章:链接
  • ansible2.8,对于初始化有了改变,2.7 使用了Python标准库里的命名元组来初始化选项,而2.8是Ansible自己封装了一个ImmutableDict,之后需要和 context结合使用的。

 

tip:  pip3 install ansible的方式安装,ansible.cfg文件会在site-packages/ansible/galaxy/data/container/tests下。

 

二、官方代码解析

2.1、官方示例(Ad-hoc)

Ansible API
  1 #!/usr/bin/env python
  2 
  3 from __future__ import (absolute_import, division, print_function)
  4 __metaclass__ = type
  5 
  6 import json
  7 import shutil
  8 
  9 import ansible.constants as C
 10 from ansible.executor.task_queue_manager import TaskQueueManager
 11 from ansible.module_utils.common.collections import ImmutableDict
 12 from ansible.inventory.manager import InventoryManager
 13 from ansible.parsing.dataloader import DataLoader
 14 from ansible.playbook.play import Play
 15 from ansible.plugins.callback import CallbackBase
 16 from ansible.vars.manager import VariableManager
 17 from ansible import context
 18 
 19 
 20 # Create a callback plugin so we can capture the output
 21 class ResultsCollectorJSONCallback(CallbackBase):
 22     """A sample callback plugin used for performing an action as results come in.
 23 
 24     If you want to collect all results into a single object for processing at
 25     the end of the execution, look into utilizing the ``json`` callback plugin
 26     or writing your own custom callback plugin.
 27     """
 28 
 29     def __init__(self, *args, **kwargs):
 30         super(ResultsCollectorJSONCallback, self).__init__(*args, **kwargs)
 31         self.host_ok = {}
 32         self.host_unreachable = {}
 33         self.host_failed = {}
 34 
 35     def v2_runner_on_unreachable(self, result):
 36         host = result._host
 37         self.host_unreachable[host.get_name()] = result
 38 
 39     def v2_runner_on_ok(self, result, *args, **kwargs):
 40         """Print a json representation of the result.
 41 
 42         Also, store the result in an instance attribute for retrieval later
 43         """
 44         host = result._host
 45         self.host_ok[host.get_name()] = result
 46         print(json.dumps({host.name: result._result}, indent=4))
 47 
 48     def v2_runner_on_failed(self, result, *args, **kwargs):
 49         host = result._host
 50         self.host_failed[host.get_name()] = result
 51 
 52 
 53 def main():
 54     host_list = [localhost, www.example.com, www.google.com]
 55     # since the API is constructed for CLI it expects certain options to always be set in the context object
 56     context.CLIARGS = ImmutableDict(connection=smart, module_path=[/to/mymodules, /usr/share/ansible], forks=10, become=None,
 57                                     become_method=None, become_user=None, check=False, diff=False)
 58     # required for
 59     # https://github.com/ansible/ansible/blob/devel/lib/ansible/inventory/manager.py#L204
 60     sources = ,.join(host_list)
 61     if len(host_list) == 1:
 62         sources += ,
 63 
 64     # initialize needed objects
 65     loader = DataLoader()  # Takes care of finding and reading yaml, json and ini files
 66     passwords = dict(vault_pass=secret)
 67 
 68     # Instantiate our ResultsCollectorJSONCallback for handling results as they come in. Ansible expects this to be one of its main display outlets
 69     results_callback = ResultsCollectorJSONCallback()
 70 
 71     # create inventory, use path to host config file as source or hosts in a comma separated string
 72     inventory = InventoryManager(loader=loader, sources=sources)
 73 
 74     # variable manager takes care of merging all the different sources to give you a unified view of variables available in each context
 75     variable_manager = VariableManager(loader=loader, inventory=inventory)
 76 
 77     # instantiate task queue manager, which takes care of forking and setting up all objects to iterate over host list and tasks
 78     # IMPORTANT: This also adds library dirs paths to the module loader
 79     # IMPORTANT: and so it must be initialized before calling `Play.load()`.
 80     tqm = TaskQueueManager(
 81         inventory=inventory,
 82         variable_manager=variable_manager,
 83         loader=loader,
 84         passwords=passwords,
 85         stdout_callback=results_callback,  # Use our custom callback instead of the ``default`` callback plugin, which prints to stdout
 86     )
 87 
 88     # create data structure that represents our play, including tasks, this is basically what our YAML loader does internally.
 89     play_source = dict(
 90         name="Ansible Play",
 91         hosts=host_list,
 92         gather_facts=no,
 93         tasks=[
 94             dict(action=dict(module=shell, args=ls), register=shell_out),
 95             dict(action=dict(module=debug, args=dict(msg={{shell_out.stdout}}))),
 96             dict(action=dict(module=command, args=dict(cmd=/usr/bin/uptime))),
 97         ]
 98     )
 99 
100     # Create play object, playbook objects use .load instead of init or new methods,
101     # this will also automatically create the task objects from the info provided in play_source
102     play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
103 
104     # Actually run it
105     try:
106         result = tqm.run(play)  # most interesting data for a play is actually sent to the callback‘s methods
107     finally:
108         # we always need to cleanup child procs and the structures we use to communicate with them
109         tqm.cleanup()
110         if loader:
111             loader.cleanup_all_tmp_files()
112 
113     # Remove ansible tmpdir
114     shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)
115 
116     print("UP ***********")
117     for host, result in results_callback.host_ok.items():
118         print({0} >>> {1}.format(host, result._result[stdout]))
119 
120     print("FAILED *******")
121     for host, result in results_callback.host_failed.items():
122         print({0} >>> {1}.format(host, result._result[msg]))
123 
124     print("DOWN *********")
125     for host, result in results_callback.host_unreachable.items():
126         print({0} >>> {1}.format(host, result._result[msg]))
127 
128 
129 if __name__ == __main__:
130     main()
View Code

 

2.2、拆分说明

  •  导入模块详解
 1 # 初始化ansible的配置选项,比如: 指定远程用户remote_user=None
 2 from ansible.module_utils.common.collections import ImmutableDict
 3 # 上下文管理器,他就是用来接收 ImmutableDict 的示例对象
 4 from ansible import context
 5 # 管理资源库,读取yaml/json/ini格式的文件,主要用于读取inventory文件
 6 from ansible.parsing.dataloader import DataLoader
 7 # 变量管理器,包括主机,组,扩展等变量
 8 from ansible.vars.manager import VariableManager
 9 # 用于创建和管理inventory,导入inventory文件
10 from ansible.inventory.manager import InventoryManager
11 # 用于执行 Ad-hoc 的类
12 from ansible.playbook.play import Play
13 # ad-hoc ansible底层用到的任务队列
14 from ansible.executor.task_queue_manager import TaskQueueManager
15 # 回调基类,用来定义回调事件,比如返回失败成功等信息
16 from ansible.plugins.callback import CallbackBase
17 # 执行playbook
18 from ansible.executor.playbook_executor import PlaybookExecutor
19 # 操作单个主机,可以给主机添加变量等操作
20 from ansible.inventory.host import Host
21 # 操作单个主机组,可以给组添加变量等操作
22 from ansible.inventory.group import Group
23 # 用于获取 ansible 产生的临时文档
24 import ansible.constants as C

 

  • 回调类
 1 class ResultsCollectorJSONCallback(CallbackBase):
 2     """
 3     回调插件就是一个类。 将执行命令的结果放到一个对象中,一边用json或自定义插件,获取执行结果的。 我们可以改写这个类,以便满足我们查询执行结果格式的需求。
 4     """
 5 
 6     def __init__(self, *args, **kwargs):
 7         super(ResultsCollectorJSONCallback, self).__init__(*args, **kwargs)
 8         self.host_ok = {}
 9         self.host_unreachable = {}
10         self.host_failed = {}
11 
12     def v2_runner_on_unreachable(self, result):
13         host = result._host
14         self.host_unreachable[host.get_name()] = result
15 
16     def v2_runner_on_ok(self, result, *args, **kwargs):
17         """
18         将结果以json形式打印。indent=4 是json格式美化参数,便于阅读
19         将结果存储在实例属性中,以便稍后检索
20         """
21         host = result._host
22         self.host_ok[host.get_name()] = result
23         print(json.dumps({host.name: result._result}, indent=4))
24 
25     def v2_runner_on_failed(self, result, *args, **kwargs):
26         host = result._host
27         self.host_failed[host.get_name()] = result

 

  • 初始化选项和source
1 # 构建一些初始化选项,在context对象中被设置
2 context.CLIARGS = ImmutableDict(connection=smart, module_path=[/to/mymodules, /usr/share/ansible], forks=10, become=None,
3                                 become_method=None, become_user=None, check=False, diff=False)
4 # source指inventory文件数据,默认指/etc/ansible/hosts,也可读取yaml/json/ini格式的文件,或者主机名以逗号隔开的字符串。
5 host_list = [localhost, www.example.com, www.google.com]
6 sources = ,.join(host_list)
7 if len(host_list) == 1:
8     sources += ,

 

  •  实例化对象
 1 # 初始化所需的对象集
 2 # 管理资源库,读取yaml/json/ini格式的文件,主要用于读取inventory文件
 3 loader = DataLoader()
 4 # 密码这里是必须使用的一个参数,假如通过公钥信任,也可以给一个空字典:dict()
 5 passwords = dict(vault_pass=secret)
 6 
 7 # 回调对象,用于查询执行结果
 8 results_callback = ResultsCollectorJSONCallback()
 9 
10 # 资源管理器,创建inventory对象,引入上面的source。
11 # 也可以:sources=‘/etc/ansible/hosts‘。也可以:sources=‘host1,host2,...‘
12 inventory = InventoryManager(loader=loader, sources=sources)
13 
14 # 变量管理器,管理变量
15 variable_manager = VariableManager(loader=loader, inventory=inventory)

 

  •  任务队列管理器
 1 # 实例化任务队列管理器
 2 # tip: 会加载库目录到loader里
 3 # tip: 必须在`Play.load()`之前初始化队列实例
 4 tqm = TaskQueueManager(
 5     inventory=inventory,
 6     variable_manager=variable_manager,
 7     loader=loader,
 8     passwords=passwords,
 9     stdout_callback=results_callback,  # 回调实例
10 )

 

  • ansible命令模块和参数
 1 play_source = dict(
 2     name="Ansible Play",  # 名字,只未方便阅读
 3     hosts=host_list,      # 执行主机列表
 4     gather_facts=no,
 5     tasks=[
 6         dict(action=dict(module=shell, args=ls), register=shell_out),
 7         dict(action=dict(module=debug, args=dict(msg={{shell_out.stdout}}))),
 8         dict(action=dict(module=command, args=dict(cmd=/usr/bin/uptime))),
 9     ]
10 )

 

  • 执行
 1 # 定义play对象
 2 play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
 3 
 4 # 在队列中执行play对象
 5 try:
 6     result = tqm.run(play)  # 执行的结果返回码,成功是 0
 7 finally:
 8     # 如果 `tqm` 不是 `None`, 需要清理子进程和我们用来与它们通信的结构。
 9     tqm.cleanup()
10     if loader:
11         loader.cleanup_all_tmp_files()
12 
13 # 最后删除 ansible 产生的临时目录
14 # 这个临时目录会在 ~/.ansible/tmp/ 目录下
15 shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)

 

  • 查询结果 
 1 print("UP ***********")
 2 for host, result in results_callback.host_ok.items():
 3     print({0} >>> {1}.format(host, result._result[stdout]))
 4 
 5 print("FAILED *******")
 6 for host, result in results_callback.host_failed.items():
 7     print({0} >>> {1}.format(host, result._result[msg]))
 8 
 9 print("DOWN *********")
10 for host, result in results_callback.host_unreachable.items():
11     print({0} >>> {1}.format(host, result._result[msg]))

 

2.3、调用playbook

 1 from ansible.executor.playbook_executor import PlaybookExecutor
 2 
 3 playbook = PlaybookExecutor(playbooks=[/root/test.yml],  # tip:这是列表
 4                  inventory=inventory,
 5                  variable_manager=variable_manager,
 6                  loader=loader,
 7                  passwords=passwords)
 8 
 9 # 使用回调函数
10 playbook._tqm._stdout_callback = results_callback
11 
12 result = playbook.run()
13 
14 for host, result in results_callback.host_ok.items():
15     print("主机{}, 执行结果{}".format(host, result._result[result][stdout])

 

三、二次开发

Ansible API
  1 # -*- coding:utf-8 -*-
  2 import json
  3 import shutil
  4 from ansible.module_utils.common.collections import ImmutableDict
  5 from ansible import context
  6 from ansible.parsing.dataloader import DataLoader
  7 from ansible.vars.manager import VariableManager
  8 from ansible.inventory.manager import InventoryManager
  9 from ansible.playbook.play import Play
 10 from ansible.executor.playbook_executor import PlaybookExecutor
 11 from ansible.executor.task_queue_manager import TaskQueueManager
 12 from ansible.plugins.callback import CallbackBase
 13 import ansible.constants as C
 14 
 15 
 16 class ResultCallback(CallbackBase):
 17     """
 18     重写callbackBase类的部分方法
 19     """
 20 
 21     def __init__(self, *args, **kwargs):
 22         # python3支持这样的语法
 23         # super().__init__(*args, **kwargs)
 24         # python2要用这样的语法
 25         super(ResultCallback, self).__init__(*args, **kwargs)
 26         self.host_ok = {}
 27         self.host_unreachable = {}
 28         self.host_failed = {}
 29         # self.task_ok = {}
 30 
 31     def v2_runner_on_unreachable(self, result):
 32         self.host_unreachable[result._host.get_name()] = result
 33 
 34     def v2_runner_on_ok(self, result, **kwargs):
 35         self.host_ok[result._host.get_name()] = result
 36 
 37     def v2_runner_on_failed(self, result, **kwargs):
 38         self.host_failed[result._host.get_name()] = result
 39 
 40 
 41 class MyAnsiable():
 42     def __init__(self,
 43                  connection=local,
 44                  module_path=None,
 45                  remote_user=None,
 46                  ack_pass=None,
 47                  sudo=None,
 48                  sudo_user=None,
 49                  ask_sudo_pass=None,
 50                  become=None,
 51                  become_method=None,
 52                  become_user=None,
 53                  listhosts=None,
 54                  listtasks=None,
 55                  listtags=None,
 56                  verbosity=3,
 57                  syntax=None,
 58                  start_at_task=None,
 59                  check=False,
 60                  diff=False,
 61                  inventory=None,
 62                  passwords=None):
 63         """
 64         初始化函数,定义的默认的选项值,
 65         在初始化的时候可以传参,以便覆盖默认选项的值
 66         """
 67         context.CLIARGS = ImmutableDict(
 68             connection=connection,
 69             module_path=module_path,
 70             remote_user=remote_user,
 71             ack_pass=ack_pass,
 72             sudo=sudo,
 73             sudo_user=sudo_user,
 74             ask_sudo_pass=ask_sudo_pass,
 75             become=become,
 76             become_method=become_method,
 77             become_user=become_user,
 78             listhosts=listhosts,
 79             listtasks=listtasks,
 80             listtags=listtags,
 81             verbosity=verbosity,
 82             syntax=syntax,
 83             start_at_task=start_at_task,
 84             check=check,
 85             diff=diff
 86         )
 87 
 88         # 三元表达式,假如没有传递 inventory, 就使用 "localhost,"
 89         # 确定 inventory 文件
 90         self.inventory = inventory if inventory else "localhost,"
 91 
 92         # 三元表达式,假如没有传递 passwords, 就使用 {}
 93         # 设置密码,可以为空字典,但必须有此参数
 94         self.passwords = passwords if passwords else {}
 95 
 96         # 实例化数据解析器
 97         self.loader = DataLoader()
 98 
 99         # 实例化 资产配置对象
100         self.inventory_manager_obj = InventoryManager(loader=self.loader, sources=self.inventory)
101 
102         # 变量管理器
103         self.variable_manager_obj = VariableManager(self.loader, self.inventory_manager_obj)
104 
105         # 实例化回调插件对象
106         self.results_callback = ResultCallback()
107 
108     def adhoc(self, name=Ad-hoc, hosts=localhost, gather_facts="no", module="ping", args=‘‘):
109         play_source = dict(
110             name=name,
111             hosts=hosts,
112             gather_facts=gather_facts,
113             tasks=[
114                 # 这里每个 task 就是这个列表中的一个元素,格式是嵌套的字典
115                 # 也可以作为参数传递过来,这里就简单化了。
116                 {"action": {"module": module, "args": args}},
117             ])
118 
119         play = Play().load(play_source, variable_manager=self.variable_manager_obj, loader=self.loader)
120 
121         tqm = None
122         try:
123             tqm = TaskQueueManager(
124                 inventory=self.inventory_manager_obj,
125                 variable_manager=self.variable_manager_obj,
126                 loader=self.loader,
127                 passwords=self.passwords,
128                 stdout_callback=self.results_callback
129             )
130             result = tqm.run(play)
131         finally:
132             if tqm is not None:
133                 tqm.cleanup()
134             shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)
135 
136     def playbook(self, playbooks):
137         playbook = PlaybookExecutor(playbooks=playbooks,  # Tip: 这里是一个列表
138                                     inventory=self.inventory_manager_obj,
139                                     variable_manager=self.variable_manager_obj,
140                                     loader=self.loader,
141                                     passwords=self.passwords
142                                     )
143 
144         # 使用回调函数
145         playbook._tqm._stdout_callback = self.results_callback
146         # 执行
147         result = playbook.run()
148 
149     def get_result(self):
150         result_raw = {success: {}, failed: {}, unreachable: {}}
151         # print(self.results_callback.host_ok)
152         for host, result in self.results_callback.host_ok.items():
153             result_raw[success][host] = result._result
154 
155         # print(self.results_callback.host_failed)
156         for host, result in self.results_callback.host_failed.items():
157             result_raw[failed][host] = result._result
158 
159         # print(self.results_callback.host_unreachable)
160         for host, result in self.results_callback.host_unreachable.items():
161             result_raw[unreachable][host] = result._result
162 
163         # 打印结果,并且使用 JSON 格式化
164         print(json.dumps(result_raw, indent=4))
View Code

 

四、运行示例

4.1、执行默认ad-hoc

# -*- coding:utf-8 -*-
from MyAnsible import MyAnsiable

ansible_obj = MyAnsiable()
ansible_obj.adhoc()
ansible_obj.get_result()

结果:

Ansible API
{
    "success": {
        "localhost": {
            "ping": "pong",
            "invocation": {
                "module_args": {
                    "data": "pong"
                }
            },
            "ansible_facts": {
                "discovered_interpreter_python": "/usr/bin/python"
            },
            "_ansible_no_log": false,
            "changed": false
        }
    },
    "failed": {},
    "unreachable": {}
}
View Code

 

4.2、远程执行命令

# 文件名: /root/ansible-playbook/api/hosts
[tapi]
10.96.0.59

# 文件名: ansible03.py
# -*- coding:utf-8 -*-
from MyAnsible import MyAnsiable

# 配置资产配置文件,并使用 ssh 的远程连接方式
ansible_obj = MyAnsiable(inventory=/root/ansible-playbook/api/hosts, connection=smart)
# 执行对象是hosts里的组名
ansible_obj.adhoc(hosts=tapi, module=shell, args=ip a |grep "inet")
ansible_obj.get_result()

结果:

Ansible API
{
    "failed": {}, 
    "success": {
        "10.96.0.59": {
            "stderr_lines": [], 
            "cmd": "ip a |grep \"inet\"", 
            "end": "2021-01-28 15:41:25.972706", 
            "_ansible_no_log": false, 
            "stdout": "    inet 127.0.0.1/8 scope host lo\n    inet 10.96.0.59/24 brd 10.96.0.255 scope global dynamic eth0", 
            "changed": true, 
            "rc": 0, 
            "start": "2021-01-28 15:41:25.960580", 
            "stderr": "", 
            "delta": "0:00:00.012126", 
            "invocation": {
                "module_args": {
                    "creates": null, 
                    "executable": null, 
                    "_uses_shell": true, 
                    "strip_empty_ends": true, 
                    "_raw_params": "ip a |grep \"inet\"", 
                    "removes": null, 
                    "argv": null, 
                    "warn": true, 
                    "chdir": null, 
                    "stdin_add_newline": true, 
                    "stdin": null
                }
            }, 
            "stdout_lines": [
                "    inet 127.0.0.1/8 scope host lo", 
                "    inet 10.96.0.59/24 brd 10.96.0.255 scope global dynamic eth0"
            ], 
            "ansible_facts": {
                "discovered_interpreter_python": "/usr/bin/python"
            }
        }
    }, 
    "unreachable": {}
}
View Code

 

4.3、调用playbook

研究过程中,发现不能传递额外的系统变量, 后面实例懒的写了,看完我前面的代码,执行playbook应该不成问题。

 

参考资料

  • 二开:https://zhuanlan.zhihu.com/p/118411015
  •            http://www.linuxboy.net/ansiblejc/143275.html
  • ansible2.4版本前后模块差异:https://blog.csdn.net/qq_29832217/article/details/97005626
  • 常用模块使用:https://blog.csdn.net/yyy72999/article/details/81184720
  • 官网2.7  https://docs.ansible.com/ansible/2.7/dev_guide/developing_api.html
  • 官网2.8  https://docs.ansible.com/ansible/2.8/dev_guide/developing_api.html
  • 官网2.10  https://docs.ansible.com/ansible/latest/dev_guide/developing_api.html

 

Ansible API

上一篇:【第二届PHP全球开发者大会】惠新宸(鸟哥):PHP7性能之源


下一篇:PostgreSQL PHP 开发者手册