在当今数字化时代,运维工作的重要性日益凸显。面对复杂多变的IT环境,高效的自动化工具和脚本成为每一位运维人员提升工作效率、保障系统稳定的关键利器。为了助力广大运维同行在日常工作中更加得心应手,我特此精心整理了一份实用的Shell脚本合集,旨在为大家提供一份“运维人的福利”,以期在繁琐的任务处理与系统管理中,助您一臂之力。
这份脚本集合凝聚了我个人在实际运维工作中积累的经验与智慧,涵盖了诸如服务器监控、故障排查、备份恢复、性能调优、自动化部署等诸多核心场景。每一行代码都经过实战打磨,力求简洁高效,易读易用,旨在适应各种复杂的运维环境,满足不同层次运维需求。无论您是初入运维领域的新人,还是经验丰富的资深专家,都能从中找到适合自己的工具,让繁杂的运维工作化繁为简,事半功倍。
在这里,我诚挚地邀请各位运维同仁随意拿取、自由分享这份Shell脚本资源。它们不仅是您日常工作的得力助手,更是交流学习、提升技能的良好素材。让我们共同携手,借助科技的力量,驱动运维工作的智能化、自动化进程,为构建稳定、高效的信息系统保驾护航。
1.企业微信告警。此脚本通过企业微信应用,进行微信告警,可用于 Zabbix 监控。
# -*- coding: utf-8 -*-
import requests
import json
class DLF:
def __init__(self, corpid, corpsecret):
self.url = "https://qyapi.weixin.qq.com/cgi-bin"
self.corpid = corpid
self.corpsecret = corpsecret
self._token = self._get_token()
def _get_token(self):
'''
获取企业微信API接口的access_token
:return:
'''
token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid,
self.corpsecret)
try:
res = requests.get(token_url).json()
token = res['access_token']
return token
except Exception as e:
return str(e)
def _get_media_id(self, file_obj):
get_media_url = self.url + "/media/upload?access_token=
{}&type=file".format(self._token)
data = {"media": file_obj}
try:
res = requests.post(url=get_media_url, files=data)
media_id = res.json()['media_id']
return media_id
except Exception as e:
return str(e)
def send_text(self, agentid, content, touser=None, toparty=None):
send_msg_url = self.url + "/message/send?access_token=%s" %
(self._token)
send_data = {
"touser": touser,
"toparty": toparty,
"msgtype": "text",
"agentid": agentid,
"text": {
"content": content
}
}
try:
res = requests.post(send_msg_url, data=json.dumps(send_data))
except Exception as e:
return str(e)
def send_image(self, agentid, file_obj, touser=None, toparty=None):
media_id = self._get_media_id(file_obj)
send_msg_url = self.url + "/message/send?access_token=%s" %
(self._token)
send_data = {
"touser": touser,
"toparty": toparty,
"msgtype": "image",
"agentid": agentid,
"image": {
"media_id": media_id
}
}
try:
res = requests.post(send_msg_url, data=json.dumps(send_data))
except Exception as e:
return str(e)
2.FTP 客户端 通过 ftplib 模块操作 ftp 服务器,进行上传下载等操作。
# -*- coding: utf-8 -*-
from ftplib import FTP
from os import path
import copy
class FTPClient:
def __init__(self, host, user, passwd, port=21):
self.host = host
self.user = user
self.passwd = passwd
self.port = port
self.res = {'status': True, 'msg': None}
self._ftp = None
self._login()
def _login(self):
'''
登录FTP服务器
:return: 连接或登录出现异常时返回错误信息
'''
try:
self._ftp = FTP()
self._ftp.connect(self.host, self.port, timeout=30)
self._ftp.login(self.user, self.passwd)
except Exception as e:
return e
def upload(self, localpath, remotepath=None):
'''
上传ftp文件
:param localpath: local file path
:param remotepath: remote file path
:return:
'''
if not localpath: return 'Please select a local file. '
# 读取本地文件
# fp = open(localpath, 'rb')
# 如果未传递远程文件路径,则上传到当前目录,文件名称同本地文件
if not remotepath:
remotepath = path.basename(localpath)
# 上传文件
self._ftp.storbinary('STOR ' + remotepath, localpath)
# fp.close()
def download(self, remotepath, localpath=None):
'''
localpath
:param localpath: local file path
:param remotepath: remote file path
:return:
'''
if not remotepath: return 'Please select a remote file. '
# 如果未传递本地文件路径,则下载到当前目录,文件名称同远程文件
if not localpath:
localpath = path.basename(remotepath)
# 如果localpath是目录的话就和remotepath的basename拼接
if path.isdir(localpath):
localpath = path.join(localpath, path.basename(remotepath))
# 写入本地文件
fp = open(localpath, 'wb')
# 下载文件
self._ftp.retrbinary('RETR ' + remotepath, fp.write)
fp.close()
def nlst(self, dir='/'):
'''
查看目录下的内容
:return: 以列表形式返回目录下的所有内容
'''
files_list = self._ftp.nlst(dir)
return files_list
def rmd(self, dir=None):
'''
删除目录
:param dir: 目录名称
:return: 执行结果
'''
if not dir: return 'Please input dirname'
res = copy.deepcopy(self.res)
try:
del_d = self._ftp.rmd(dir)
res['msg'] = del_d
except Exception as e:
res['status'] = False
res['msg'] = str(e)
return res
def mkd(self, dir=None):
'''
创建目录
:param dir: 目录名称
:return: 执行结果
'''
if not dir: return 'Please input dirname'
res = copy.deepcopy(self.res)
try:
mkd_d = self._ftp.mkd(dir)
res['msg'] = mkd_d
except Exception as e:
res['status'] = False
res['msg'] = str(e)
return res
def del_file(self, filename=None):
'''
删除文件
:param filename: 文件名称
:return: 执行结果
'''
if not filename: return 'Please input filename'
res = copy.deepcopy(self.res)
try:
del_f = self._ftp.delete(filename)
res['msg'] = del_f
except Exception as e:
res['status'] = False
res['msg'] = str(e)
return res
def get_file_size(self, filenames=[]):
'''
获取文件大小,单位是字节
判断文件类型
:param filename: 文件名称
:return: 执行结果
'''
if not filenames: return {'msg': 'This is an empty directory'}
res_l = []
for file in filenames:
res_d = {}
# 如果是目录或者文件不存在就会报错
try:
size = self._ftp.size(file)
type = 'f'
except:
# 如果是路径的话size显示 - , file末尾加/ (/dir/)
size = '-'
type = 'd'
file = file + '/'
res_d['filename'] = file
res_d['size'] = size
res_d['type'] = type
res_l.append(res_d)
return res_l
def rename(self, old_name=None, new_name=None):
'''
重命名
:param old_name: 旧的文件或者目录名称
:param new_name: 新的文件或者目录名称
:return: 执行结果
'''
if not old_name or not new_name: return 'Please input old_name and
new_name'
res = copy.deepcopy(self.res)
try:
rename_f = self._ftp.rename(old_name, new_name)
res['msg'] = rename_f
except Exception as e:
res['status'] = False
res['msg'] = str(e)
return res
def close(self):
'''
退出ftp连接
:return:
'''
try:
# 向服务器发送quit命令
self._ftp.quit()
except Exception:
return 'No response from server'
finally:
# 客户端单方面关闭连接
self._ftp.close()
3.SSH 客户端 此脚本仅用于通过 key 连接,如需要密码连接,简单修改下即可。
# -*- coding: utf-8 -*-
import paramiko
class SSHClient:
def __init__(self, host, port, user, pkey):
self.ssh_host = host
self.ssh_port = port
self.ssh_user = user
self.private_key = paramiko.RSAKey.from_private_key_file(pkey)
self.ssh = None
self._connect()
def _connect(self):
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
self.ssh.connect(hostname=self.ssh_host, port=self.ssh_port,
username=self.ssh_user, pkey=self.private_key, timeout=10)
except:
return 'ssh connect fail'
def execute_command(self, command):
stdin, stdout, stderr = self.ssh.exec_command(command)
out = stdout.read()
err = stderr.read()
return out, err
def close(self):
self.ssh.close()
如果想上手操作练代码的同学们可以通过阿里云ecs服务器免费试用参与!
入口一:新老同学免费试用
入口二:上云第一站
入口三:学生版超低价云服务器
入口四:云服务器专享特惠版
入口五:云服务器特惠1.5折起
入口七:阿里云最新活动中心
入口八:中小企业权益满减礼包