1.概述
我们数仓ADS层产出物,基本上都是同步到MySql中,供下游展示及报表查询使用,但经常会碰到Leader需要每天看一眼某某数据,登录BI系统有时不太方便,这就需要我们开发人员将产出的数据每天定时定点推送Leader邮箱,方便Leader查阅。这里我们使用Python3读取MaxComputer表数据存储到服务器中,然后读取服务器使用邮件发送出去
2.实现
1.发送前的准备
1.有个邮箱,设置好授权码
2.因为是在DataWorks上调度,所以要有独享调度资源组
3.创建PyODPS 3脚本文件
2.模块导入
将Python所需要的模块导入
fromemail.mimeimportimagefromosimportwritefromtimeimportsleep, timefromodpsimportODPSimportodpsfromodps.dfimportDataFrameimportpandasaspdimportdatetimeimportsmtplibfromemail.mime.textimportMIMETextfromemail.mime.multipartimportMIMEMultipartfromemail.headerimportHeaderfromemail.mime.imageimportMIMEImagefrompandas.core.indexes.apiimportall_indexes_sameimportparamiko
如果发现有的模块ODPS上并没有,则需要install进来
1.资源组列表,找到所要用的调度资源组里的运维助手
2.创建命令
3.编写好名称等,输入命令内容,比如这里需要paramiko
3.构造服务器中文件路径
首先我们先链接服务器,构造出一个文件路径,方便后续ODPS数据写入
defserver_path(): ssh=paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname='xxx',port=22,username='xxx',password='xxx') stdin, stdout, stderr=ssh.exec_command('touch '+r'/xxx/'+yesterday+"xxx.xlsx") sftp_client=ssh.open_sftp() file_path=sftp_client.open(r'/xxx/'+yesterday+"xxx.xlsx",'w+') returnfile_path
4.读取ODPS
读取ODPS中的报表数据,写入上面的服务器中
defodps_server_exce(file_path): writer=pd.ExcelWriter(file_path) #读取ODPS SQL语句odps_sql1='SELECT xxx,xxx,xxx,xxx,xxx,xxx FROM xxx WHERE ds = \"%s\"'%(bizdate) odps_sql2='SELECT xxx,xxx FROM xxx WHERE ds = \"%s\"'%(bizdate) #将读取的数据放入数组中witho.execute_sql(odps_sql1).open_reader() asreader: alist1=[] forrecordinreader: blist=[] foriinrecord: blist.append(i[1]) alist1.append(blist) witho.execute_sql(odps_sql2).open_reader() asreader: alist2=[] forrecordinreader: blist=[] foriinrecord: blist.append(i[1]) alist2.append(blist) #构造Excel Sheet1,并设置表头df=pd.DataFrame(alist1,columns=["xxx","xxx","xxx","xxx","xxx","xxx"]) df.to_excel(writer,sheet_name="xxx",index=False) #构造Excel Sheet2,并设置表头df=pd.DataFrame(alist2,columns=["xxx","xxx"]) df.to_excel(writer,sheet_name="xxx",index=False) writer.save() #关闭sftp_open文件file_path.close()
5.发送邮件
编写邮件标题、内容、附件
defsend_mail(path): #链接服务器ssh=paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname='xxx',port=22,username='xxx',password='xxx') sftp_client=ssh.open_sftp() file_path=sftp_client.open(r'/xxx/'+yesterday+"xxx.xlsx",'rb') smtp_server='smtp.xxx'#服务器sender='xxx'#发送人passwd='xxx'#口令recevier= ['xxx,xxx,xxx']#收件人cc=['xxx,xxx'] #抄送人message=MIMEMultipart('related') title=yesterday+"_xxx"#标题message['Subject']=Header(title,'utf-8') message['From'] =sendermessage['To'] =','.join(recevier) message['Cc'] =','.join(cc) #编写邮件正文message.attach(MIMEText('''xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx''')) #构造邮件附件 att1=MIMEText(file_path.read(),'base64','utf-8') att1['Content-Type'] ='application/octet-stream'att1.add_header('Content-Disposition', 'attachment', filename=yesterday+"_xxx.xlsx") message.attach(att1) #发送邮件smtp=smtplib.SMTP_SSL(smtp_server,xxx) smtp.login(sender,passwd) smtp.sendmail(sender,message['To'].split(',')+message['Cc'].split(','),message.as_string()) print("发送成功") #关闭sftp_open文件 file_path.close()
6.脚本全文
fromemail.mimeimportimagefromosimportwritefromtimeimportsleep, timefromodpsimportODPSimportodpsfromodps.dfimportDataFrameimportpandasaspdimportdatetimeimportsmtplibfromemail.mime.textimportMIMETextfromemail.mime.multipartimportMIMEMultipartfromemail.headerimportHeaderfromemail.mime.imageimportMIMEImagefrompandas.core.indexes.apiimportall_indexes_sameimportparamikobizdate= (datetime.date.today() +datetime.timedelta(days=-1)).strftime("%Y%m%d") yesterday= (datetime.date.today() +datetime.timedelta(days=-1)).strftime("%Y-%m-%d") defserver_path(): ssh=paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname='xxx',port=22,username='xxx',password='xxx') stdin, stdout, stderr=ssh.exec_command('touch '+r'/xxx/'+yesterday+"xxx.xlsx") sftp_client=ssh.open_sftp() file_path=sftp_client.open(r'/xxx/'+yesterday+"xxx.xlsx",'w+') returnfile_pathdefodps_server_exce(file_path): writer=pd.ExcelWriter(file_path) #读取ODPS SQL语句odps_sql1='SELECT xxx,xxx,xxx,xxx,xxx,xxx FROM xxx WHERE ds = \"%s\"'%(bizdate) odps_sql2='SELECT xxx,xxx FROM xxx WHERE ds = \"%s\"'%(bizdate) #将读取的数据放入数组中witho.execute_sql(odps_sql1).open_reader() asreader: alist1=[] forrecordinreader: blist=[] foriinrecord: blist.append(i[1]) alist1.append(blist) witho.execute_sql(odps_sql2).open_reader() asreader: alist2=[] forrecordinreader: blist=[] foriinrecord: blist.append(i[1]) alist2.append(blist) #构造Excel Sheet1,并设置表头df=pd.DataFrame(alist1,columns=["xxx","xxx","xxx","xxx","xxx","xxx"]) df.to_excel(writer,sheet_name="xxx",index=False) #构造Excel Sheet2,并设置表头df=pd.DataFrame(alist2,columns=["xxx","xxx"]) df.to_excel(writer,sheet_name="xxx",index=False) writer.save() #关闭sftp_open文件file_path.close() defsend_mail(path): #链接服务器ssh=paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname='xxx',port=22,username='xxx',password='xxx') sftp_client=ssh.open_sftp() file_path=sftp_client.open(r'/xxx/'+yesterday+"xxx.xlsx",'rb') smtp_server='smtp.xxx'#服务器sender='xxx'#发送人passwd='xxx'#口令recevier= ['xxx,xxx,xxx']#收件人cc=['xxx,xxx'] #抄送人message=MIMEMultipart('related') title=yesterday+"_xxx"#标题message['Subject']=Header(title,'utf-8') message['From'] =sendermessage['To'] =','.join(recevier) message['Cc'] =','.join(cc) #编写邮件正文message.attach(MIMEText('''xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx''')) #构造邮件附件 att1=MIMEText(file_path.read(),'base64','utf-8') att1['Content-Type'] ='application/octet-stream'att1.add_header('Content-Disposition', 'attachment', filename=yesterday+"_xxx.xlsx") message.attach(att1) #发送邮件smtp=smtplib.SMTP_SSL(smtp_server,xxx) smtp.login(sender,passwd) smtp.sendmail(sender,message['To'].split(',')+message['Cc'].split(','),message.as_string()) print("发送成功") #关闭sftp_open文件 file_path.close() if__name__=='__main__': file_path=linux_path() odpsDataToExcel(file_path) sendTomail(file_path)
7.设置定时
DataWorks上右侧调度配上界面,配置好时间几点发送,调度选择独享调度资源组,依赖上游为ODPS表
3.总结
以上就是使用Python3编写代码在DataWorks实现定时邮件推送功能,实现的方法应该有很多,目前依靠阿里云组件目前我们采用的是这种方式,若有不足之处,欢迎指正。
拜了个拜