README:
1、需求
- [ ] 利用RibbitMQ进行数据交互
- [ ] 可以对多台服务器进行批量操作
- [ ] 执行命令后不等待命令的执行结果,而是直接让输入下一条命令,结果出来后自动打印
- [ ] 实现异步操作
备注
- [ ] RabbitMQ队列:
①执行命令时,队列名为“rpc_queue2”
②查询数据时,用的是回调时随机生成的callback_queue名
③conf/settings——Rabbitmq地址“192.168.17.102”,端口:5672,用户名:admin,密码:admin
- [ ] SSH:
RPC_Server/server.py——paramiko操作连接的测试Linux默认端口22,用户名:root,密码:123456
- [ ] threading多线程:
实现命令执行后不等待执行结果,依然可以输入新的指令
- [ ] 执行命令格式:
-->>run ifconfig host 192.168.20.22 192.168.20.23
dir server端要执行的命令
host host后可跟一个或多个可以通过rabbitMQ的服务器地址
- [ ] 查看后台所有的TASK_ID信息:
-->>check_task
显示结果样式:TASK_ID【76786】 HOST【192.168.20.22】 COMMAND【dir】
TASK_ID【10307】 HOST【192.168.20.23】 COMMAND【dir】
- [ ] 查看TASK_ID对应的执行结果:
-->>get_task 10307
程序目录结构:
├── README.md
├── RPC_Client
│ ├── bin
│ │ ├── __init__.py
│ │ └── start.py #客户端启动程序
│ ├── conf
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-36.pyc
│ │ │ └── settings.cpython-36.pyc
│ │ └── settings.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── main.py
│ │ └── __pycache__
│ │ ├── __init__.cpython-36.pyc
│ │ └── main.cpython-36.pyc
│ └── modules
│ ├── client.py
│ ├── __init__.py
│ └── __pycache__
│ ├── client.cpython-36.pyc
│ └── __init__.cpython-36.pyc
└── RPC_Server
├── conf
│ ├── __pycache__
│ │ └── settings.cpython-36.pyc
│ └── settings.py
└── server.py #server端启动程序
程序启动:
客户端启动:RPC_Client/bin/start.py
服务端启动:RPC_Server/server.py
RPC 客户端bin目录start.py
import os,sys,platform
if platform.system() == 'Windows':
BASE_DIR = '\\'.join(os.path.abspath(os.path.dirname(__file__)).split('\\')[:-1])
else:
BASE_DIR = '/'.join(os.path.abspath(os.path.dirname(__file__)).split('/')[:-1])
sys.path.append(BASE_DIR)
from core import main
if __name__ == "__main__":
handle = main.Handle()
handle.start()
RPC客户度conf目录 settings.py
import os,sys,platform
if platform.system() == 'Windows':
BASE_DIR = '\\'.join(os.path.abspath(os.path.dirname(__file__)).split('\\')[:-1])
else:
BASE_DIR = '/'.join(os.path.abspath(os.path.dirname(__file__)).split('/')[:-1])
sys.path.append(BASE_DIR)
RabbitmqHost = '192.168.17.102'
RabbitmqUser = 'admin'
RabbitmqPwd = 'admin'
credentails = pika.PlainCredentials(RabbitmqUser,RabbitmqPwd)
RPC客户端主程序core/main.py
import pika
import random
import threading
from modules import client
from conf import settings
class Handle(object):
def __init__(self):
# 建立连接,指定服务器的ip地址
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=settings.RabbitmqHost,credentials=settings.credentails))
# 建立一个会话,每个channel代表一个会话任务
self.channel = self.connection.channel()
def run_cmd(self,cmd, host):
rpc_client = client.Client(self.connection,self.channel)
task_id = str(random.randint(1000,9999)) #生成4位的Correlation id
response = rpc_client.call(cmd, host)
self.corr_id = response[1]
print('Task_ID',task_id)
self.info[task_id] = [self.corr_id,host,cmd,response[0],response[1]]
def start(self):
self.info = {} #task 返回任务信息字典
help = '''
命令格式
执行系统命令:run command host eg: run ls 10.10.0.10
查看所有执行任务:check_task
查看指定任务结果:get_task id eg:get_task 6723
'''
print(help)
while True:
msg = input(">> ").strip()
if msg.startswith('run') and len(msg.split()) >= 3:
cmd = msg.split()[1]
#多线程运行
th_join = []
for host in msg.split()[2:]:
th = threading.Thread(target=self.run_cmd,args=(cmd,host,))
th.start()
th_join.append(th)
for t in th_join:
t.join()
elif msg == 'check_task':
if not self.info:
print("没有任务队列")
continue
else:
for taskid,task in self.info.items():
print('TaskID [%s] Host [%s] COMMAND [%s]'%(taskid,task[1],task[2]))
elif msg.startswith('get_task'):
rpc_client = client.Client(self.connection,self.channel)
if msg.split()[1] in self.info:
task_id = msg.split()[1]
callback_queue = self.info[task_id][3]
correlation_id = self.info[task_id][4]
task_result = rpc_client.get_task(callback_queue,correlation_id)
del self.info[task_id]
print(task_result.decode().strip())
else:
print('输入的task ID不存在!')
continue
elif not msg:
continue
else:
print('输入错误,请重新输入!')
continue
RPC客户端modules
import pika
import random
import uuid
class Client(object):
def __init__(self,connection,channel):
self.connection = connection
self.channel = channel
# 对回调队列中的响应进行处理的函数
def on_response(self, ch, method, props, body):
if self.correlation_id == props.correlation_id:
self.response = body
ch.basic_ack(delivery_tag=method.delivery_tag)
def get_task(self,callback_queue,correlation_id):
# 初始化 response
self.response = None
self.correlation_id = correlation_id
# 客户端订阅回调队列,当回调队列中有响应时,调用`on_response`方法对响应进行处理;
self.channel.basic_consume(self.on_response,queue=callback_queue)
while self.response is None:
self.connection.process_data_events()
return self.response
# 发出RPC请求
def call(self,cmd,host):
# 声明回调队列,再次声明的原因是,服务器和客户端可能先后开启,该声明是幂等的,多次声明,但只生效一次
result = self.channel.queue_declare(exclusive=True)
# 将次队列指定为当前客户端的回调队列
self.callback_queue = result.method.queue
msg = cmd + " " + "".join(host)
self.corr_id = str(uuid.uuid4())
#self.corr_id = corr_id
# 发送RPC请求内容到RPC请求队列`rpc_queue`,同时发送的还有`reply_to`和`correlation_id`
self.channel.basic_publish(exchange='',
routing_key='rpc_queue2',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=msg)
return self.callback_queue,self.corr_id
RPC服务器settings.py
RabbitmqHost = '192.168.17.102'
RabbitmqUser = 'admin'
RabbitmqPwd = 'admin'
credentails = pika.PlainCredentials(RabbitmqUser,RabbitmqPwd)
RPC服务端主程序:
#!/usr/bin/env python
import pika
import paramiko
import os,sys,platform
if platform.system() == 'Windows':
BASE_DIR = '\\'.join(os.path.abspath(os.path.dirname(__file__)).split('\\')[:-1])
else:
BASE_DIR = '/'.join(os.path.abspath(os.path.dirname(__file__)).split('/')[:-1])
sys.path.append(BASE_DIR)
from conf import settings
# 建立连接,服务器地址为localhost,可指定ip地址
connection = pika.BlockingConnection(pika.ConnectionParameters(host=settings.RabbitmqHost,credentials=settings.credentails))
# 建立会话
channel = connection.channel()
# 声明RPC请求队列
channel.queue_declare(queue='rpc_queue2')
# 数据处理方法
def exec_cmd(cmd,host):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=host, port=22, username='root', password='123456',timeout=10)
stdin, stdout, stderr = ssh.exec_command(cmd)
stdout_result = stdout.read()
stderr_result = stderr.read()
result = stdout_result if stdout_result else stderr_result
return result.decode()
ssh.close()
# 对RPC请求队列中的请求进行处理
def on_request(ch, method, props, body):
cmd = body.split()[0]
host = body.split()[1]
# 调用数据处理方法
response = exec_cmd(cmd,host)
# 将处理结果(响应)发送到回调队列
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(on_request, queue='rpc_queue2')
print(" [x] Awaiting RPC requests")
channel.start_consuming()
本文转自 baiying 51CTO博客,原文链接:http://blog.51cto.com/baiying/2065436,如需转载请自行联系原作者