训练shell常用脚本练习(四)

简介: 【4月更文挑战第14天】shell代码训练(四)

在当今数字化时代,运维工作的重要性日益凸显。面对复杂多变的IT环境,高效的自动化工具和脚本成为每一位运维人员提升工作效率、保障系统稳定的关键利器。为了助力广大运维同行在日常工作中更加得心应手,我特此精心整理了一份实用的Shell脚本合集,旨在为大家提供一份“运维人的福利”,以期在繁琐的任务处理与系统管理中,助您一臂之力。

这份脚本集合凝聚了我个人在实际运维工作中积累的经验与智慧,涵盖了诸如服务器监控、故障排查、备份恢复、性能调优、自动化部署等诸多核心场景。每一行代码都经过实战打磨,力求简洁高效,易读易用,旨在适应各种复杂的运维环境,满足不同层次运维需求。无论您是初入运维领域的新人,还是经验丰富的资深专家,都能从中找到适合自己的工具,让繁杂的运维工作化繁为简,事半功倍。

在这里,我诚挚地邀请各位运维同仁随意拿取、自由分享这份Shell脚本资源。它们不仅是您日常工作的得力助手,更是交流学习、提升技能的良好素材。让我们共同携手,借助科技的力量,驱动运维工作的智能化、自动化进程,为构建稳定、高效的信息系统保驾护航。

1.Saltstack 客户端 通过 api 对 Saltstack 服务端进行操作,执行命令。

#!/usr/bin/env python

# -*- coding:utf-8 -*-

import requests

import json

import copy

class SaltApi:

"""

定义salt api接口的类

初始化获得token

"""

def __init__(self):

self.url = "http://172.85.10.21:8000/"

self.username = "saltapi"

self.password = "saltapi"

self.headers = {"Content-type": "application/json"}

self.params = {'client': 'local', 'fun': None, 'tgt': None, 'arg': None}

self.login_url = self.url + "login"

self.login_params = {'username': self.username, 'password':

self.password, 'eauth': 'pam'}

self.token = self.get_data(self.login_url, self.login_params)['token']

self.headers['X-Auth-Token'] = self.token

def get_data(self, url, params):

'''

请求url获取数据

:param url: 请求的url地址

:param params: 传递给url的参数

:return: 请求的结果

'''

send_data = json.dumps(params)

request = requests.post(url, data=send_data, headers=self.headers)

response = request.json()

result = dict(response)

return result['return'][0]

def get_auth_keys(self):

'''

获取所有已经认证的key

:return:

'''

data = copy.deepcopy(self.params)

data['client'] = 'wheel'

data['fun'] = 'key.list_all'

result = self.get_data(self.url, data)

try:

return result['data']['return']['minions']

except Exception as e:

return str(e)

def get_grains(self, tgt, arg='id'):

"""

获取系统基础信息

:tgt: 目标主机

:return:

"""

data = copy.deepcopy(self.params)

if tgt:

data['tgt'] = tgt

else:

data['tgt'] = '*'

data['fun'] = 'grains.item'

data['arg'] = arg

result = self.get_data(self.url, data)

return result

def execute_command(self, tgt, fun='cmd.run', arg=None, tgt_type='list',

salt_async=False):

"""

执行saltstack 模块命令,类似于salt '*' cmd.run 'command'

:param tgt: 目标主机

:param fun: 模块方法 可为空

:param arg: 传递参数 可为空

:return: 执行结果

"""

data = copy.deepcopy(self.params)

if not tgt: return {'status': False, 'msg': 'target host not exist'}

if not arg:

data.pop('arg')

else:

data['arg'] = arg

if tgt != '*':

data['tgt_type'] = tgt_type

if salt_async: data['client'] = 'local_async'

data['fun'] = fun

data['tgt'] = tgt

result = self.get_data(self.url, data)

return result

def jobs(self, fun='detail', jid=None):

"""

任务

:param fun: active, detail

:param jod: Job ID

:return: 任务执行结果

"""

data = {'client': 'runner'}

data['fun'] = fun

if fun == 'detail':

if not jid: return {'success': False, 'msg': 'job id is none'}

data['fun'] = 'jobs.lookup_jid'

data['jid'] = jid

else:

return {'success': False, 'msg': 'fun is active or detail'}

result = self.get_data(self.url, data)

return result

2.vCenter 客户端 通过官方 SDK 对 vCenter 进行日常操作,此脚本是我用于 cmdb 平台的,自动获取主机信息,存入数据库。

from pyVim.connect import SmartConnect, Disconnect, SmartConnectNoSSL

from pyVmomi import vim

from asset import models

import atexit

class Vmware:

def __init__(self, ip, user, password, port, idc, vcenter_id):

self.ip = ip

self.user = user

self.password = password

self.port = port

self.idc_id = idc

self.vcenter_id = vcenter_id

def get_obj(self, content, vimtype, name=None):

'''

列表返回,name 可以指定匹配的对象

'''

container = content.viewManager.CreateContainerView(content.rootFolder,

vimtype, True)

obj = [ view for view in container.view ]

return obj

def get_esxi_info(self):

# 宿主机信息

esxi_host = {}

res = {"connect_status": True, "msg": None}

try:

# connect this thing

si = SmartConnectNoSSL(host=self.ip, user=self.user,

pwd=self.password, port=self.port, connectionPoolTimeout=60)

except Exception as e:

res['connect_status'] = False

try:

res['msg'] = ("%s Caught vmodl fault : " + e.msg) % (self.ip)

except Exception as e:

res['msg'] = '%s: connection error' % (self.ip)

return res

# disconnect this thing

atexit.register(Disconnect, si)

content = si.RetrieveContent()

esxi_obj = self.get_obj(content, [vim.HostSystem])

for esxi in esxi_obj:

esxi_host[esxi.name] = {}

esxi_host[esxi.name]['idc_id'] = self.idc_id

esxi_host[esxi.name]['vcenter_id'] = self.vcenter_id

esxi_host[esxi.name]['server_ip'] = esxi.name

esxi_host[esxi.name]['manufacturer'] = esxi.summary.hardware.vendor

esxi_host[esxi.name]['server_model'] = esxi.summary.hardware.model

for i in esxi.summary.hardware.otherIdentifyingInfo:

if isinstance(i, vim.host.SystemIdentificationInfo):

esxi_host[esxi.name]['server_sn'] = i.identifierValue

# 系统名称

esxi_host[esxi.name]['system_name'] =

esxi.summary.config.product.fullName

# cpu总核数

esxi_cpu_total = esxi.summary.hardware.numCpuThreads

# 内存总量 GB

esxi_memory_total = esxi.summary.hardware.memorySize / 1024 / 1024 /

1024

# 获取硬盘总量 GB

esxi_disk_total = 0

for ds in esxi.datastore:

esxi_disk_total += ds.summary.capacity / 1024 / 1024 / 1024

# 默认配置4核8G100G,根据这个配置计算剩余可分配虚拟机

default_configure = {

'cpu': 4,

'memory': 8,

'disk': 100

}

esxi_host[esxi.name]['vm_host'] = []

vm_usage_total_cpu = 0

vm_usage_total_memory = 0

vm_usage_total_disk = 0

# 虚拟机信息

for vm in esxi.vm:

host_info = {}

host_info['vm_name'] = vm.name

host_info['power_status'] = vm.runtime.powerState

host_info['cpu_total_kernel'] = str(vm.config.hardware.numCPU) +

'核'

host_info['memory_total'] = str(vm.config.hardware.memoryMB) +

'MB'

host_info['system_info'] = vm.config.guestFullName

disk_info = ''

disk_total = 0

for d in vm.config.hardware.device:

if isinstance(d, vim.vm.device.VirtualDisk):

disk_total += d.capacityInKB / 1024 / 1024

disk_info += d.deviceInfo.label + ": " +

str((d.capacityInKB) / 1024 / 1024) + ' GB' + ','

host_info['disk_info'] = disk_info

esxi_host[esxi.name]['vm_host'].append(host_info)

# 计算当前宿主机可用容量:总量 - 已分配的

if host_info['power_status'] == 'poweredOn':

vm_usage_total_cpu += vm.config.hardware.numCPU

vm_usage_total_disk += disk_total

vm_usage_total_memory += (vm.config.hardware.memoryMB /

1024)

esxi_cpu_free = esxi_cpu_total - vm_usage_total_cpu

esxi_memory_free = esxi_memory_total - vm_usage_total_memory

esxi_disk_free = esxi_disk_total - vm_usage_total_disk

esxi_host[esxi.name]['cpu_info'] = 'Total: %d核, Free: %d核' %

(esxi_cpu_total, esxi_cpu_free)

esxi_host[esxi.name]['memory_info'] = 'Total: %dGB, Free: %dGB' %

(esxi_memory_total, esxi_memory_free)

esxi_host[esxi.name]['disk_info'] = 'Total: %dGB, Free: %dGB' %

(esxi_disk_total, esxi_disk_free)

# 计算cpu 内存 磁盘按照默认资源分配的最小值,即为当前可分配资源

if esxi_cpu_free < 4 or esxi_memory_free < 8 or esxi_disk_free <

100:

free_allocation_vm_host = 0

else:

free_allocation_vm_host = int(min(

[

esxi_cpu_free / default_configure['cpu'],

esxi_memory_free / default_configure['memory'],

esxi_disk_free / default_configure['disk']

]

))

esxi_host[esxi.name]['free_allocation_vm_host'] =

free_allocation_vm_host

esxi_host['connect_status'] = True

return esxi_host

def write_to_db(self):

esxi_host = self.get_esxi_info()

# 连接失败

if not esxi_host['connect_status']:

return esxi_host

del esxi_host['connect_status']

for machine_ip in esxi_host:

# 物理机信息

esxi_host_dict = esxi_host[machine_ip]

# 虚拟机信息

virtual_host = esxi_host[machine_ip]['vm_host']

del esxi_host[machine_ip]['vm_host']

obj = models.EsxiHost.objects.create(**esxi_host_dict)

obj.save()

for host_info in virtual_host:

host_info['management_host_id'] = obj.id

obj2 = models.virtualHost.objects.create(**host_info)

obj2.save()

3.获取域名 ssl 证书过期时间用于 zabbix 告警

import re

import sys

import time

import subprocess

from datetime import datetime

from io import StringIO

def main(domain):

f = StringIO()

comm = f"curl -Ivs https://{domain} --connect-timeout 10"

result = subprocess.getstatusoutput(comm)

f.write(result[1])

try:

m = re.search('start date: (.*?)\n.*?expire date: (.*?)\n.*?common name:

(.*?)\n.*?issuer: CN=(.*?)\n', f.getvalue(), re.S)

start_date = m.group(1)

expire_date = m.group(2)

common_name = m.group(3)

issuer = m.group(4)

except Exception as e:

return 999999999

# time 字符串转时间数组

start_date = time.strptime(start_date, "%b %d %H:%M:%S %Y GMT")

start_date_st = time.strftime("%Y-%m-%d %H:%M:%S", start_date)

# datetime 字符串转时间数组

expire_date = datetime.strptime(expire_date, "%b %d %H:%M:%S %Y GMT")

expire_date_st = datetime.strftime(expire_date,"%Y-%m-%d %H:%M:%S")

# 剩余天数

remaining = (expire_date-datetime.now()).days

return remaining

if __name__ == "__main__":

domain = sys.argv[1]

remaining_days = main(domain)

print(remaining_days)

4.发送今天的天气预报以及未来的天气趋势图 通过企业微信进行通知

image.png

# -*- coding: utf-8 -*-

import requests

import json

import datetime

def weather(city):

url = "http://wthrcdn.etouch.cn/weather_mini?city=%s" % city

try:

data = requests.get(url).json()['data']

city = data['city']

ganmao = data['ganmao']

today_weather = data['forecast'][0]

res = "今天是{}\n今天天气概况\n城市: {:<10}\n时间: {:<10}\n高温: {:

<10}\n低温: {:<10}\n风力: {:<10}\n风向: {:<10}\n天气: {:<10}\n\n稍后会发送近期温度趋势图,请注意查看。\

".format(

ganmao,

city,

datetime.datetime.now().strftime('%Y-%m-%d'),

today_weather['high'].split()[1],

today_weather['low'].split()[1],

today_weather['fengli'].split('[')[2].split(']')[0],

today_weather['fengxiang'],today_weather['type'],

)

return {"source_data": data, "res": res}

except Exception as e:

return str(e)

```

+ 获取天气预报趋势图

```python

# -*- coding: utf-8 -*-

import matplotlib.pyplot as plt

import re

import datetime

def Future_weather_states(forecast, save_path, day_num=5):

'''

展示未来的天气预报趋势图

:param forecast: 天气预报预测的数据

:param day_num: 未来几天

:return: 趋势图

'''

future_forecast = forecast

dict={}

for i in range(day_num):

data = []

date = future_forecast[i]["date"]

date = int(re.findall("\d+",date)[0])

data.append(int(re.findall("\d+", future_forecast[i]["high"])[0]))

data.append(int(re.findall("\d+", future_forecast[i]["low"])[0]))

data.append(future_forecast[i]["type"])

dict[date] = data

data_list = sorted(dict.items())

date=[]

high_temperature = []

low_temperature = []

for each in data_list:

date.append(each[0])

high_temperature.append(each[1][0])

low_temperature.append(each[1][1])

fig = plt.plot(date,high_temperature,"r",date,low_temperature,"b")

current_date = datetime.datetime.now().strftime('%Y-%m')

plt.rcParams['font.sans-serif'] = ['SimHei']

plt.rcParams['axes.unicode_minus'] = False

plt.xlabel(current_date)

plt.ylabel("℃")

plt.legend(["高温", "低温"])

plt.xticks(date)

plt.title("最近几天温度变化趋势")

plt.savefig(save_path)

```

+ 发送到企业微信

```python

# -*- coding: utf-8 -*-

import requests

import json

class DLF:

def __init__(self, corpid, corpsecret):

self.url = "https://qyapi.weixin.qq.com/cgi-bin"

self.corpid = corpid

self.corpsecret = corpsecret

self._token = self._get_token()

def _get_token(self):

'''

获取企业微信API接口的access_token

:return:

'''

token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %

(self.corpid, self.corpsecret)

try:

res = requests.get(token_url).json()

token = res['access_token']

return token

except Exception as e:

return str(e)

def _get_media_id(self, file_obj):

get_media_url = self.url + "/media/upload?access_token=

{}&type=file".format(self._token)

data = {"media": file_obj}

try:

res = requests.post(url=get_media_url, files=data)

media_id = res.json()['media_id']

return media_id

except Exception as e:

return str(e)

def send_text(self, agentid, content, touser=None, toparty=None):

send_msg_url = self.url + "/message/send?access_token=%s" %

(self._token)

send_data = {

"touser": touser,

"toparty": toparty,

"msgtype": "text",

"agentid": agentid,

"text": {

"content": content

}

}

try:

res = requests.post(send_msg_url, data=json.dumps(send_data))

except Exception as e:

return str(e)

def send_image(self, agentid, file_obj, touser=None, toparty=None):

media_id = self._get_media_id(file_obj)

send_msg_url = self.url + "/message/send?access_token=%s" %

(self._token)

send_data = {

"touser": touser,

"toparty": toparty,

"msgtype": "image",

"agentid": agentid,

"image": {

"media_id": media_id

}

}

try:

res = requests.post(send_msg_url, data=json.dumps(send_data))

except Exception as e:

return str(e)

+ main脚本

# -*- coding: utf-8 -*-

from plugins.weather_forecast import weather

from plugins.trend_chart import Future_weather_states

from plugins.send_wechat import DLF

import os

# 企业微信相关信息

corpid = "xxx"

corpsecret = "xxx"

agentid = "xxx"

# 天气预报趋势图保存路径

_path = os.path.dirname(os.path.abspath(__file__))

save_path = os.path.join(_path ,'./tmp/weather_forecast.jpg')

# 获取天气预报信息

content = weather("大兴")

# 发送文字消息

dlf = DLF(corpid, corpsecret)

dlf.send_text(agentid=agentid, content=content['res'], toparty='1')

# 生成天气预报趋势图

Future_weather_states(content['source_data']['forecast'], save_path)

# 发送图片消息

file_obj = open(save_path, 'rb')

dlf.send_image(agentid=agentid, toparty='1', file_obj=file_obj)

如果想上手操作练代码的同学们可以通过阿里云ecs服务器免费试用参与!

入口:新老同学免费试用

目录
相关文章
|
2月前
|
Shell
一个用于添加/删除定时任务的shell脚本
一个用于添加/删除定时任务的shell脚本
106 1
|
1月前
|
Shell Linux 测试技术
6种方法打造出色的Shell脚本
6种方法打造出色的Shell脚本
59 2
6种方法打造出色的Shell脚本
|
25天前
|
XML JSON 监控
Shell脚本要点和难点以及具体应用和优缺点介绍
Shell脚本在系统管理和自动化任务中扮演着重要角色。尽管存在调试困难、可读性差等问题,但其简洁高效、易于学习和强大的功能使其在许多场景中不可或缺。通过掌握Shell脚本的基本语法、常用命令和函数,并了解其优缺点,开发者可以编写出高效的脚本来完成各种任务,提高工作效率。希望本文能为您在Shell脚本编写和应用中提供有价值的参考和指导。
53 1
|
29天前
|
Ubuntu Shell 开发工具
ubuntu/debian shell 脚本自动配置 gitea git 仓库
这是一个自动配置 Gitea Git 仓库的 Shell 脚本,支持 Ubuntu 20+ 和 Debian 12+ 系统。脚本会创建必要的目录、下载并安装 Gitea,创建 Gitea 用户和服务,确保 Gitea 在系统启动时自动运行。用户可以选择从官方或小绿叶技术博客下载安装包。
45 2
|
2月前
|
监控 网络协议 Shell
ip和ip网段攻击拦截系统-绿叶结界防火墙系统shell脚本
这是一个名为“小绿叶技术博客扫段攻击拦截系统”的Bash脚本,用于监控和拦截TCP攻击。通过抓取网络数据包监控可疑IP,并利用iptables和firewalld防火墙规则对这些IP进行拦截。同时,该系统能够查询数据库中的白名单,确保合法IP不受影响。此外,它还具备日志记录功能,以便于后续分析和审计。
51 6
|
1月前
|
运维 监控 Shell
深入理解Linux系统下的Shell脚本编程
【10月更文挑战第24天】本文将深入浅出地介绍Linux系统中Shell脚本的基础知识和实用技巧,帮助读者从零开始学习编写Shell脚本。通过本文的学习,你将能够掌握Shell脚本的基本语法、变量使用、流程控制以及函数定义等核心概念,并学会如何将这些知识应用于实际问题解决中。文章还将展示几个实用的Shell脚本例子,以加深对知识点的理解和应用。无论你是运维人员还是软件开发者,这篇文章都将为你提供强大的Linux自动化工具。
|
2月前
|
监控 Unix Shell
shell脚本编程学习
【10月更文挑战第1天】shell脚本编程
79 12
|
2月前
|
存储 运维 监控
自动化运维:使用Shell脚本简化日常任务
【9月更文挑战第35天】在IT运维的日常工作中,重复性的任务往往消耗大量的时间。本文将介绍如何通过编写简单的Shell脚本来自动化这些日常任务,从而提升效率。我们将一起探索Shell脚本的基础语法,并通过实际案例展示如何应用这些知识来创建有用的自动化工具。无论你是新手还是有一定经验的运维人员,这篇文章都会为你提供新的视角和技巧,让你的工作更加轻松。
69 2
|
3月前
|
Shell
shell脚本变量 $name ${name}啥区别
shell脚本变量 $name ${name}啥区别
|
2月前
|
存储 Shell Linux
【Linux】shell基础,shell脚本
Shell脚本是Linux系统管理和自动化任务的重要工具,掌握其基础及进阶用法能显著提升工作效率。从简单的命令序列到复杂的逻辑控制和功能封装,Shell脚本展现了强大的灵活性和实用性。不断实践和探索,将使您更加熟练地运用Shell脚本解决各种实际问题
34 0