一、ssh操作及接受响应
common.py
# coding=utf-8 """ @Project :pachong-master @File :common.py @Author :gaojs @Date :2022/7/9 14:01 @Blogs : https://www.gaojs.com.cn """ import sys import logging import time import paramiko from time import sleep import re import requests from faker import Factory sys.setrecursionlimit(5000) class CLI: def ssh_ag(self, hostname='192.168.120.209', port=22, username='array', password='admin'): """ :param self: :return: """ # 创建ssh对象 self.fin = open('log.txt', 'w') self.ssh = paramiko.SSHClient() self.logging = logging # 允许连接不在know_hosts文件中的主机 self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy) # 连接AG self.ssh.connect(hostname=hostname, port=port, username=username, password=password) sleep(5) channel = self.ssh.invoke_shell() self.channel = channel channel.settimeout(5) output = channel.recv(2048).decode('utf-8') time.sleep(1) self.fin.write(output) self.cli_cmd('enable') self.cli_cmd('') self.cli_cmd('config ter') self.cli_cmd('no page') def print_step(self): """ :return: """ result = self.channel.recv(2048) print(result.decode()) def cli_cmd(self, cli, prompt="[#\$]", timeout=3): """ :param cli: :return: """ output = '' self.logging.info("AG execute command in enable mode: " + cli) self.channel.send(cli + '\n') output = self.read_until(prompt, timeout) return output def quit_enable(self): """ :return: """ self.cli_cmd('switch global') self.cli_cmd('exit') def clear_log(self): """ :return: """ sleep(2) self.cli_cmd('clear log b') def read_until(self, expected, timeout=10): """ 等待回显:这个方法是从同事那里偷来的哈哈哈 :return: """ output = '' regexp = re.compile(expected) max_time = time.time() + timeout i = 0 while time.time() < max_time: i = i + 1 tmp = "" try: tmp = self.channel.recv(1024).decode('utf-8') except: pass self.fin.write(tmp) self.fin.flush() output += tmp if regexp.search(output): return output return output def switch_vsite(self, vsite): """ 切换虚拟站点 :param vsite: :return: """ self.cli_cmd('sw %s' % vsite) def get_sn(self): """ 获取sn码 :return: """ # with open('log.txt', mode='r') as fin: # resp = fin.read() resp = self.cli_cmd('show version') result = re.compile('Serial Number : [A-Z0-9]{31}') sn = result.findall(resp)[0] sn_number = sn.split(':')[1] # print(sn_number) return sn_number def create_test_password(self, sn): """ 生成test用户密码 """ url = 'http://10.3.0.50/cgi-bin/passwd_res' f = Factory.create() ua = f.user_agent() headers = { 'User-Agent': ua } data = { 'serial': sn } rsp = requests.post(url=url, headers=headers, data=data) passwd = re.findall('password: (.*?)</pre>', rsp.text)[0] print(passwd) return passwd if __name__ == '__main__': """ 自动获取test密码 """ ag_ip = input('请输入您AG管理IP:') test = CLI() test.ssh_ag(hostname=ag_ip) sn_number = test.get_sn() passwd = test.create_test_password(sn_number) print(f'*********************** 您的test账户密码是 {passwd} ************************\n')
2.云服务器操作
# coding=utf-8 """ @Project :pachong-master @File :test01.py @Author :gaojs @Date :2022/7/9 15:40 @Blogs : https://www.gaojs.com.cn """ import sys import logging import time import paramiko from time import sleep import re sys.setrecursionlimit(5000) class CLI: def ssh_ag(self, hostname='101.4x.39.xxx', port=22, username='root', password='xxxxxx'): """ :param self: :return: """ # 创建ssh对象 self.fin = open('log.txt', 'w+') self.ssh = paramiko.SSHClient() self.logging = logging # 允许连接不在know_hosts文件中的主机 self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy) # 连接AG self.ssh.connect(hostname=hostname, port=port, username=username, password=password) sleep(5) channel = self.ssh.invoke_shell() self.channel = channel channel.settimeout(5) output = channel.recv(2048).decode('utf-8') time.sleep(1) self.fin.write(output) def print_step(self): """ :return: """ result = self.channel.recv(2048) print(result.decode()) def cli_cmd(self, cli, prompt="[#\$]", timeout=3): """ :param cli: :return: """ output = '' self.logging.info("cloud server execute command in enable mode: " + cli) self.channel.send(cli + '\n') output = self.read_until(prompt, timeout) return output def read_until(self, expected, timeout=10): """ 等待回显 :return: """ output = '' regexp = re.compile(expected) max_time = time.time() + timeout i = 0 while time.time() < max_time: i = i + 1 tmp = "" try: tmp = self.channel.recv(1024).decode('utf-8') except: pass self.fin.write(tmp) self.fin.flush() output += tmp if regexp.search(output): return output return output if __name__ == '__main__': """ 自动获取test密码 """ test = CLI() test.ssh_ag() test.cli_cmd('ll /opt') test.cli_cmd('cd /var/www/html/') test.cli_cmd('date') test.cli_cmd('ls'