利用阿里云ECS运行Python脚本监测汽车车窗车门状态

本文涉及的产品
云服务器 ECS,每月免费额度280元 3个月
云服务器ECS,u1 2核4GB 1个月
简介: 利用阿里云CES运行脚本监测汽车车窗车门状态

1、购买阿里云服务器

https://www.aliyun.com/product/ecs

2、安装宝塔

yum install -ywget && wget-O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh ed099927

3、编写脚本

①导入包

importrequestsimportjsonimportosimporttimeimportsmtplibfromemail.mime.textimportMIMETextfromemail.utilsimportformataddr

②请求登录接口

deflogin():
try:
url="######"payload=json.dumps({
"username": username,
"password": password        })
headers= {
'Host': 'api.xchanger.cn',
'Cookie': '###########',
'accept': 'application/json;responseformat=3',
'content-type': 'application/json',
'x-app-id': '##',
'user-agent': 'BackgroundShortcutRunner/1092.8.7 CFNetwork/1220.1 Darwin/20.3.0',
'accept-language': 'zh-cn'        }
response=requests.request("POST", url, headers=headers, data=payload,verify=False)
print(response.json())
accessToken=response.json()['data']['accessToken']
userId=response.json()['data']['userId']
withopen(r'login.txt', "w+") asf1:
f1.write(accessToken)
withopen(r'userId.txt', "w+") asf2:
f2.write(userId)
returnaccessTokenexceptExceptionasDamnMistake:
print(DamnMistake)
mail("登录错误",str(DamnMistake))

③请求车辆状态接口

defstatus(sessionToken):
ifos.path.exists(r'userId.txt'):
withopen(r'userId.txt', "r") asf:
userId=f.read()
else:
userId="#####"url="#####"+vin+"#####"+userIdpayload= {}
headers= {
'Host': 'api.xchanger.cn',
'Cookie': '#######',
'accept': 'application/json;responseformat=3',
'ecarx-net-platform': 'tsp',
'user-agent': '#########)',
'authorization': sessionToken,
'accept-language': 'zh-cn'    }
response=requests.request("GET", url, headers=headers, data=payload,verify=False)
try:
#数据更新日期updateTime=response.json()['data']['vehicleStatus']['updateTime']
updateTime=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(response.json()['data']['vehicleStatus']['updateTime']/1000))
#左前车窗(2是关闭)winStatusDriver=response.json()['data']['vehicleStatus']['additionalVehicleStatus']['climateStatus']['winStatusDriver']
#右前车窗(2是关闭)winStatusPassenger=response.json()['data']['vehicleStatus']['additionalVehicleStatus']['climateStatus']['winStatusPassenger']
#左后车窗(2是关闭)winStatusDriverRear=response.json()['data']['vehicleStatus']['additionalVehicleStatus']['climateStatus']['winStatusDriverRear']
#右后车窗(2是关闭)winStatusPassengerRear=response.json()['data']['vehicleStatus']['additionalVehicleStatus']['climateStatus']['winStatusPassengerRear']
#冷却液是否正常(ture是正常)engineCoolantTemperatureValidity=response.json()['data']['vehicleStatus']['additionalVehicleStatus']['runningStatus']['engineCoolantTemperatureValidity']
#后尾箱是否开启(1开启,0没有开启)trunkOpenStatus=response.json()['data']['vehicleStatus']['additionalVehicleStatus']['drivingSafetyStatus']['trunkOpenStatus']
#引擎盖是否开启(1开启,0没有开启)engineHoodOpenStatus=response.json()['data']['vehicleStatus']['additionalVehicleStatus']['drivingSafetyStatus']['engineHoodOpenStatus']
#左前门打开状态(0是关闭)doorOpenStatusDriver=response.json()['data']['vehicleStatus']['additionalVehicleStatus']['drivingSafetyStatus']['doorOpenStatusDriver']
#右前门打开状态(0是关闭)doorOpenStatusPassenger=response.json()['data']['vehicleStatus']['additionalVehicleStatus']['drivingSafetyStatus']['doorOpenStatusPassenger']
#左后门打开状态(0是关闭)doorOpenStatusDriverRear=response.json()['data']['vehicleStatus']['additionalVehicleStatus']['drivingSafetyStatus']['doorOpenStatusDriverRear']
#右后门打开状态(0是关闭)doorOpenStatusPassengerRear=response.json()['data']['vehicleStatus']['additionalVehicleStatus']['drivingSafetyStatus']['doorOpenStatusPassengerRear']
#左前门门锁状态(1是已上锁)doorLockStatusDriver=response.json()['data']['vehicleStatus']['additionalVehicleStatus']['drivingSafetyStatus']['doorLockStatusDriver']
#左后门门锁状态(1是已上锁)doorLockStatusDriverRear=response.json()['data']['vehicleStatus']['additionalVehicleStatus']['drivingSafetyStatus']['doorLockStatusDriverRear']
#右前门门锁状态(1是已上锁)doorLockStatusPassenger=response.json()['data']['vehicleStatus']['additionalVehicleStatus']['drivingSafetyStatus']['doorLockStatusPassenger']
#右后门门锁状态(1是已上锁)doorLockStatusPassengerRear=response.json()['data']['vehicleStatus']['additionalVehicleStatus']['drivingSafetyStatus']['doorLockStatusPassengerRear']
#车辆报警是否开启(1为已开启)vehicleAlarm=response.json()['data']['vehicleStatus']['additionalVehicleStatus']['drivingSafetyStatus']['vehicleAlarm']
#电瓶电压voltage=response.json()['data']['vehicleStatus']['additionalVehicleStatus']['maintenanceStatus']['mainBatteryStatus']['voltage']
#发动机状态(engine_off为关闭,engine_running为开启)engineStatus=response.json()['data']['vehicleStatus']['basicVehicleStatus']['engineStatus']
carStatus_dict= {
"数据更新时间": updateTime,
"左前车窗": winStatusDriver,
"右前车窗": winStatusPassenger,
"左后车窗": winStatusDriverRear,
"右后车窗": winStatusPassengerRear,
"后尾箱是否开启": trunkOpenStatus,
"引擎盖是否开启": engineHoodOpenStatus,
"左前门打开状态": doorOpenStatusDriver,
"右前门打开状态": doorOpenStatusPassenger,
"左后门打开状态": doorOpenStatusDriverRear,
"右后门打开状态": doorOpenStatusPassengerRear,
"左前门门锁状态": doorLockStatusDriver,
"左后门门锁状态": doorLockStatusDriverRear,
"右前门门锁状态": doorLockStatusPassenger,
"右后门门锁状态": doorLockStatusPassengerRear,
"车辆报警是否开启": vehicleAlarm,
"电瓶电压": voltage,
"发动机状态": engineStatus        }
withopen(r'carStatus_dict.txt', "w+") asf:
f.write(str(carStatus_dict))
returncarStatus_dictexceptExceptionasDamnMistake:
print(DamnMistake)
mail("获取各项数据错误",str(DamnMistake))


⑤发送邮件

defmail(msgTitle, result_json):
my_sender='####@qq.com'# 发件人邮箱账号my_pass='####'# 发件人邮箱密码my_user='#####@qq.com'# 收件人邮箱账号try:
msg=MIMEText(json.dumps(result_json, sort_keys=True, indent=4, separators=(',', ':'), ensure_ascii=False),
'plain', 'utf-8')
msg['From'] =formataddr(["###", my_sender])  # 括号里的对应发件人邮箱昵称、发件人邮箱账号msg['To'] =formataddr(["FK", my_user])  # 括号里的对应收件人邮箱昵称、收件人邮箱账号msg['Subject'] =msgTitle# 邮件的主题server=smtplib.SMTP_SSL("smtp.qq.com", 465)  # 发件人邮箱中的SMTP服务器,端口是25server.login(my_sender, my_pass)  # 括号中对应的是发件人邮箱账号、邮箱密码server.sendmail(my_sender, my_user, msg.as_string())  # 括号中对应的是发件人邮箱账号、收件人邮箱账号、发送邮件server.quit()  # 关闭连接exceptExceptionasfuckError:  # 如果 try 中的语句没有执行,则会执行下面的print('发送邮件报错了,报错内容为:\n')
print(fuckError)

⑥主程序

defmain():
sessionToken=update_token()
carStatus_dict=status(sessionToken)
print(carStatus_dict)
car_Alarm=[]
ifcarStatus_dict['发动机状态'] =="engine_off":
ifcarStatus_dict['左前车窗'] !="2":
car_Alarm.append("左前车窗未关闭")
ifcarStatus_dict['右前车窗'] !="2":
car_Alarm.append("右前车窗未关闭")
ifcarStatus_dict['左后车窗'] !="2":
car_Alarm.append("左后车窗未关闭")
ifcarStatus_dict['右后车窗'] !="2":
car_Alarm.append("右后车窗未关闭")
ifcarStatus_dict['后尾箱是否开启'] =="1":
car_Alarm.append("后尾箱未关闭")
ifcarStatus_dict['左前门门锁状态'] !="1":
car_Alarm.append("左前门未锁")
ifcarStatus_dict['左后门门锁状态'] !="1":
car_Alarm.append("左后门未锁")
ifcarStatus_dict['右前门门锁状态'] !="1":
car_Alarm.append("右前门未锁")
ifcarStatus_dict['右后门门锁状态'] !="1":
car_Alarm.append("右后门未锁")
ifcarStatus_dict['左前门打开状态'] =="1":
car_Alarm.append("左前门未关闭")
ifcarStatus_dict['左后门打开状态'] =="1":
car_Alarm.append("左后门未关闭")
ifcarStatus_dict['右前门打开状态'] =="1":
car_Alarm.append("右前门未关闭")
ifcarStatus_dict['右后门打开状态'] =="1":
car_Alarm.append("右后门未关闭")
print(car_Alarm)
else:
passiflen(car_Alarm)>0:
car_Alarm.append("数据更新时间:"+carStatus_dict['数据更新时间'])
withopen(r'car_Alarm.txt', "w+") asf1:
f1.write(str(car_Alarm))
mail("车辆报警!!!!!!", car_Alarm)


4、上传脚本

①打开宝塔

②点击“文件”

③上传

image.png

5、开启定时任务

①打开宝塔

②点击“计划任务”

image.png

6、结束

image.png

image.png































相关实践学习
一小时快速掌握 SQL 语法
本实验带您学习SQL的基础语法,快速入门SQL。
7天玩转云服务器
云服务器ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,可降低 IT 成本,提升运维效率。本课程手把手带你了解ECS、掌握基本操作、动手实操快照管理、镜像管理等。了解产品详情: https://www.aliyun.com/product/ecs
相关文章
|
12天前
|
安全 Java 数据处理
Python网络编程基础(Socket编程)多线程/多进程服务器编程
【4月更文挑战第11天】在网络编程中,随着客户端数量的增加,服务器的处理能力成为了一个重要的考量因素。为了处理多个客户端的并发请求,我们通常需要采用多线程或多进程的方式。在本章中,我们将探讨多线程/多进程服务器编程的概念,并通过一个多线程服务器的示例来演示其实现。
|
18天前
|
弹性计算
阿里云ECS使用体验
在申请高校学生免费体验阿里云ECS云服务器后的一些使用体验和感受。
|
3天前
|
存储 监控 安全
什么情况下物理服务器会运行出错?
物理服务器,也称为裸机服务器,一般可以提供高性能计算水平和巨大的存储容量。然而,它们也难免会遇到一些问题。运行出错时,可能会导致停机和数据丢失。
28 15
|
5天前
|
弹性计算
阿里云ECS的使用心得
本文主要讲述了我是如何了解到ECS,使用ECS的一些经验,以及自己的感悟心得
|
7天前
|
弹性计算 运维 安全
阿里云ecs使用体验
整了台服务器部署项目上线
|
10天前
|
弹性计算 应用服务中间件 Linux
阿里云ECS服务器上从零开始搭建nginx服务器
阿里云ECS服务器上从零开始搭建nginx服务器
|
12天前
|
弹性计算 安全
电子好书发您分享《阿里云第八代企业级ECS实例,为企业提供更安全的云上防护》
阿里云第八代ECS实例,搭载第五代英特尔至强处理器与飞天+CIPU架构,提升企业云服务安全与算力。[阅读详情](https://developer.aliyun.com/ebook/8303/116162?spm=a2c6h.26392459.ebook-detail.5.76bf7e5al1Zn4U) ![image](https://ucc.alicdn.com/pic/developer-ecology/cok6a6su42rzm_f422f7cb775444bbbfc3e61ad86800c2.png)
35 14
|
15天前
|
Python
Python网络编程基础(Socket编程)UDP服务器编程
【4月更文挑战第8天】Python UDP服务器编程使用socket库创建UDP套接字,绑定到特定地址(如localhost:8000),通过`recvfrom`接收客户端数据报,显示数据长度、地址和内容。无连接的UDP协议使得服务器无法主动发送数据,通常需应用层实现请求-响应机制。当完成时,用`close`关闭套接字。
|
15天前
|
数据采集 存储 人工智能
【python】python汽车效能数据集—回归建模(源码+数据集)【独一无二】
【python】python汽车效能数据集—回归建模(源码+数据集)【独一无二】
|
16天前
|
弹性计算 前端开发 Java
使用阿里云 mqtt serverless 版本超低成本快速实现 webscoket 长链接服务器
使用阿里云 MQTT Serverless 可轻松实现弹性伸缩的 WebSocket 服务,每日成本低至几元。适用于小程序消息推送的 MQTT P2P 模式。前端需注意安全,避免 AK 泄露,采用一机一密方案。后端通过调用 `RegisterDeviceCredential` API 发送消息。示例代码包括 JavaScript 前端连接和 Java 后端发送。
151 0