运维
Tcpdump抓包工具
tshark抓包工具
Ansible
Ansible配置
Ansible-远程命令模块( command / script / shell )
Ansible-常用模块
PlayBook
PlayBook-变量
PlayBook-条件/循环
PlayBook-Tags
PlayBook-常用脚本
Ansible-Vault(数据安全)
Ansible-API
Ansible实践
JMeter测试软件
JMeter性能指标
Curl
综合分析工具
磁盘/IO工具
网络分析工具
JAVA分析工具
更换硬盘
Linux启动流程
安装问题
GURB加密
修改默认启动项
Root密码忘记
重装内核、GRUB
Too many open files错误
误删文件,内存恢复
Read-only file system错误
本文档使用MrDoc发布
返回首页
-
+
Ansible-API
2022年3月16日 17:33
admin
#参考资料: https://www.jianshu.com/p/ec1e4d8438e9 --- #ansible(2.8 - 2.9 版本) ##模块参数说明 ![](/media//202203/2022-03-16174022889399.png) --- #整体流程 --- ##回调插件 ![](/media//202203/2022-03-16174543733720.png) --- ##选项 ![](/media//202203/2022-03-16174629421891.png) --- ##数据解析器、密码和回调插件对象 ![](/media//202203/2022-03-16174658871624.png) --- ##创建资源库对象 ![](/media//202203/2022-03-16174745238979.png) --- ##变量管理器 ![](/media//202203/2022-03-16174803205494.png) --- ##创建一个 Ad-hoc ![](/media//202203/2022-03-16174835024974.png) --- ##创建一个 play 对象 ![](/media//202203/2022-03-16174917498446.png) --- ##任务队列管理器 ![](/media//202203/2022-03-16174946100870.png) --- #Ad-Hoc示例:(ansible 2.8-2.9 版本) ![](/media//202203/2022-03-16173934258365.png) #!/usr/bin/python import json import shutil import ansible.constants as C from ansible.executor.task_queue_manager import TaskQueueManager from ansible.module_utils.common.collections import ImmutableDict from ansible.inventory.manager import InventoryManager from ansible.parsing.dataloader import DataLoader from ansible.playbook.play import Play from ansible.plugins.callback import CallbackBase from ansible.vars.manager import VariableManager from ansible import context #创建回调 class ResultsCollectorJSONCallback(CallbackBase): def __init__(self, *args, **kwargs): super(ResultsCollectorJSONCallback, self).__init__(*args, **kwargs) self.host_ok = {} self.host_unreachable = {} self.host_failed = {} def v2_runner_on_unreachable(self, result): host = result._host self.host_unreachable[host.get_name()] = result def v2_runner_on_ok(self, result, *args, **kwargs): host = result._host self.host_ok[host.get_name()] = result #print(json.dumps({host.name: result._result}, indent=4)) def v2_runner_on_failed(self, result, *args, **kwargs): host = result._host self.host_failed[host.get_name()] = result def main(): host_list = ['172.16.93.190'] #初始化选项参数 context.CLIARGS = ImmutableDict(connection='smart', module_path=['/to/mymodules', '/usr/share/ansible'], forks=10, become=None,become_method=None, become_user=None, check=False, diff=False, syntax=None,start_at_task=None,verbosity=0) #动态inventory # https://github.com/ansible/ansible/blob/devel/lib/ansible/inventory/manager.py#L204 #sources = ','.join(host_list) #if len(host_list) == 1: # sources += ',' # initialize needed objects loader = DataLoader() # Takes care of finding and reading yaml, json and ini files passwords = dict(vault_pass='secret') #初始化回调实例 results_callback = ResultsCollectorJSONCallback() #定义inventory inventory = InventoryManager(loader=loader, sources='/etc/ansible/hosts') #定义变量文件 variable_manager = VariableManager(loader=loader, inventory=inventory) #定义执行队列 tqm = TaskQueueManager( inventory=inventory, variable_manager=variable_manager, loader=loader, passwords=passwords, stdout_callback=results_callback, # Use our custom callback instead of the ``default`` callback plugin, which prints to stdout ) #构建执行体 play_source = dict( name="Ansible Play", hosts=host_list, gather_facts='no', tasks=[ #dict(action=dict(module='shell', args='ls'), register='shell_out'), dict(action=dict(module='shell', args='df -hT')), #dict(action=dict(module='debug', args=dict(msg='{{shell_out.stdout}}'))), #dict(action=dict(module='command', args=dict(cmd='/usr/bin/uptime'))), ] ) play = Play().load(play_source, variable_manager=variable_manager, loader=loader) #执行 try: result = tqm.run(play) finally: tqm.cleanup() if loader: loader.cleanup_all_tmp_files() # Remove ansible tmpdir #shutil.rmtree(C.DEFAULT_LOCAL_TMP, True) #打印结果 print("UP ***********") for host, result in results_callback.host_ok.items(): print('{0} >>> {1}'.format(host, result._result['stdout'].encode("utf8"))) print("FAILED *******") for host, result in results_callback.host_failed.items(): print('{0} >>> {1}'.format(host, result._result['msg'])) print("DOWN *********") for host, result in results_callback.host_unreachable.items(): print('{0} >>> {1}'.format(host, result._result['msg'])) if __name__ == '__main__': main() --- #PlayBook示例:(ansible 2.8-2.9 版本) ![](/media//202203/2022-03-16175121906757.png) #!/usr/bin/python import json import shutil import ansible.constants as C from ansible.executor.task_queue_manager import TaskQueueManager from ansible.module_utils.common.collections import ImmutableDict from ansible.inventory.manager import InventoryManager from ansible.parsing.dataloader import DataLoader #from ansible.playbook.play import Play from ansible.executor.playbook_executor import PlaybookExecutor from ansible.plugins.callback import CallbackBase from ansible.vars.manager import VariableManager from ansible import context #创建回调 class ResultsCollectorJSONCallback(CallbackBase): def __init__(self, *args, **kwargs): super(ResultsCollectorJSONCallback, self).__init__(*args, **kwargs) self.host_ok = {} self.host_unreachable = {} self.host_failed = {} def v2_runner_on_unreachable(self, result): host = result._host self.host_unreachable[host.get_name()] = result def v2_runner_on_ok(self, result, *args, **kwargs): host = result._host self.host_ok[host.get_name()] = result #print(json.dumps({host.name: result._result}, indent=4)) def v2_runner_on_failed(self, result, *args, **kwargs): host = result._host self.host_failed[host.get_name()] = result def main(): host_list = ['172.16.93.190'] #初始化参数 context.CLIARGS = ImmutableDict(connection='smart', module_path=['/to/mymodules', '/usr/share/ansible'], forks=10, become=None,become_method=None, become_user=None, check=False, diff=False,syntax=None,start_at_task=None, verbosity=0) # initialize needed objects loader = DataLoader() # Takes care of finding and reading yaml, json and ini files passwords = dict(vault_pass='secret') #初始化回调实例 results_callback = ResultsCollectorJSONCallback() #定义inventory inventory = InventoryManager(loader=loader, sources='/etc/ansible/hosts') #定义变量文件 variable_manager = VariableManager(loader=loader, inventory=inventory) #构建执行体 playbook = PlaybookExecutor(playbooks=['/root/test.yml'], inventory=inventory, variable_manager=variable_manager, loader=loader, passwords=passwords) #使用playbook回调函数 playbook._tqm._stdout_callback = results_callback #执行 result = playbook.run() # Remove ansible tmpdir #shutil.rmtree(C.DEFAULT_LOCAL_TMP, True) #打印结果 print("UP ***********") for host, result in results_callback.host_ok.items(): print('{0} >>> {1}'.format(host, result._result['stdout'].encode("utf8"))) print("FAILED *******") for host, result in results_callback.host_failed.items(): print('{0} >>> {1}'.format(host, result._result['msg'])) print("DOWN *********") for host, result in results_callback.host_unreachable.items(): print('{0} >>> {1}'.format(host, result._result['msg'])) if __name__ == '__main__': main()
分享到: