python调用spark示例

简介: python调用spark示例
# coding=utf-8importsysprint('sys.executable--',sys.executable)
importsklearnprint("Sklearn verion is {}".format(sklearn.__version__))
# reload(sys)# sys.setdefaultencoding("utf8")importpandasaspdimportnumpyasnpfromsklearn.linear_modelimportLinearRegressionfromdatetimeimportdate,datetime,timedeltafrompysparkimportSparkConffrompysparkimportSparkContextfrompyspark.sqlimportHiveContext,SQLContextfrompyspark.storagelevelimportStorageLevelfromsklearn.externalsimportjoblibfromsklearn.clusterimportKMeansimportpandasaspdif__name__=="__main__":
db_name=sys.argv[1] #0对应文件名data_date=sys.argv[2]
inc_day=data_date.replace('-','')
k=5iteration=1000print('db_name:{0}      data_date:{1}   inc_day:{2}'.format( db_name, data_date,inc_day))
#测试读取hive数据conf=SparkConf().setAppName("family")
sc=SparkContext(conf=conf)
hc=HiveContext(sc)
sqlContext=SQLContext(sc)
#查询a="""SELECT distinct      create_date                                as     `时间`,     depot_code                                 as     `场地编码`,     ascs_sorting_code                          as     `ascs分拣线code`,     ascs_distinct_waybill_count                as     `补码成功运单数`,     ascs_distinct_photo_count                  as     `补码成功图片数`,     ascs_has_face_count                        as     `有面单数`,     ascs_not_face_count                        as     `无面单数`,     ascs_timeout_photo_count                   as     `补码超时数`,     ascs_success_rate                          as     `补码成功率`,     ascs_pick_supply_count                     as     `分拣成功数`,     ascs_saving_hours                          as     `节约工时`,     ascs_total_no                              as     `自动化处理量`,     ascs_return_count                          as     `回流量`,     ascs_assist_nr_rate                        as     `辅助无读识别率`,     ascs_return_rate                           as     `回流挽回率`,     case when (ascs_distinct_waybill_count-ascs_pick_supply_count)>=0 then 'yes' else 'no' end  as `补码成功运单数是否大于等于分拣成功数`    FROM  dm_argus_sheduler_tmp.brfd_shizhong_buma_success_tmp_buma    where inc_day<='{inc_day}'""".format(inc_day=inc_day)
data_day=hc.sql(a)
# spark转pandasdata=data_day.toPandas()
print(data.columns)
print(data.dtypes)
print(data)
print('补码成功运单数小于分拣成功数的记录:',data[data['补码成功运单数是否大于等于分拣成功数']=='no'])
# inputfile = 'data.csv' #销量及其他属性数据# use_clos=['时间','ascs分拣线code','补码成功运单数','补码成功图片数','有面单数','无面单数',# '补码超时数','补码成功率','分拣成功数','自动化处理量','回流量','辅助无读识别率','回流挽回率'# ]# data = pd.read_csv(inputfile,usecols=use_clos) #读取数据# data=data.dropna(how='any')data=data.fillna(0)
# data=data[(data['ascs分拣线code']!=0)&(data['补码成功运单数']!=0)&(data['补码成功图片数']!=0)]data=data[(data['ascs分拣线code']!=0)]
data.set_index(['时间','ascs分拣线code'],drop=True,inplace=True)
data['补码图片数']=data['有面单数']+data['无面单数']+data['补码超时数']
data['补码超时率']=data['补码超时数']/data['补码图片数']
data['呈像系统有效性1']=data['有面单数']/(data['有面单数']+data['无面单数'])
data['呈像系统有效性2']=(data['补码成功运单数']/data['补码成功图片数'])
data['分拣环节有效性']=(data['分拣成功数']/data['补码成功运单数'])
data['辅助无读识别率']=data['辅助无读识别率']
data['回流挽回率']=data['回流挽回率']
# data=data.dropna(how='any')data=data[['自动化处理量','补码图片数','有面单数','无面单数','补码成功运单数','补码成功图片数','分拣成功数','辅助无读识别率','回流挽回率','补码成功率','补码超时率','呈像系统有效性1','呈像系统有效性2','分拣环节有效性']]
data_zs=1.0*(data-data.min())/data.max() #数据标准化data_zs=data_zs.fillna(data_zs.mean())
model=KMeans(n_clusters=k, n_jobs=4, max_iter=iteration) #分为k类,并发数4model.fit(data_zs) #开始聚类#整理输出数据#1.模型相关信息r1=pd.DataFrame(pd.Series(model.labels_).value_counts() ,columns=['各类数量'])#统计各个类别的数目r2=pd.DataFrame(model.cluster_centers_,columns=data.columns) #找出聚类中心cluster_cnt=pd.concat([r2, r1], axis=1) #横向连接(0是纵向),得到聚类中心对应的类别下的数目r2=r2.sort_values(by=['回流挽回率'],ascending=False)
new_label={ v:str(i+1)+'类'fori,vinenumerate(r2.index)}#可解释的类别映射cluster_cnt[u'聚类类别']=cluster_cnt.index.map(new_label)
cluster_cnt=cluster_cnt.sort_values(by=['聚类类别'])
cluster_cnt=cluster_cnt[['聚类类别','各类数量']]
cluster_cnt.columns=['centers_no','cnt']
print('各类数量:',cluster_cnt)
#2.聚类结果#关联可解释性类别标识作为index【入库保存】#详细输出原始数据及其类别r=pd.concat([data, pd.Series(model.labels_, index=data.index,name='聚类类别').map(new_label)], axis=1) 
#关联可解释性类别标识作为index【入库保存】r_=r.copy(deep=True)
r_=r_.reset_index()
r_=r_[r_['时间']==data_date]
r_=r_[['时间','ascs分拣线code','辅助无读识别率','回流挽回率','聚类类别']]
r_.columns=['date','ascs_code','fuzhuwudushibielv','huiliuwanhuilv','class']
print('聚类结果:',r_)
#pandas转sparkcluster_cnt=hc.createDataFrame(cluster_cnt)
r_=hc.createDataFrame(r_)
#建表hc.sql("set hive exec.dynamic.partition.mode =nonstrict")
hc.sql("set hive exec.dynamic.partition = true")
#各类数量sql_str="CREATE TABLE IF NOT EXISTS {db_name}.brfd_ascs_kmeans_out_1 (centers_no string, cnt string) partitioned BY (inc_day string) row format delimited fields terminated BY '|'".format(db_name=db_name)
hc.sql(sql_str)
#聚类结果sql_str="CREATE TABLE IF NOT EXISTS {db_name}.brfd_ascs_kmeans_out_2 (date string, ascs_code string,fuzhuwudushibielv string ,huiliuwanhuilv string, class string) partitioned BY (inc_day string) row format delimited fields terminated BY '|'".format(db_name=db_name)
hc.sql(sql_str)
#分区表可以先创建三种临时表hc.sql("use {db_name}".format(db_name=db_name))
cluster_cnt.registerTempTable("temp_table_1")
r_.registerTempTable("temp_table_2")
#插入方式三种hc.sql("""                   insert overwrite table {db_name}.brfd_ascs_kmeans_out_1 partition(inc_day='{inc_day}')                   select *                   from temp_table_1                   """.format(db_name=db_name,inc_day=inc_day)
               )
hc.sql("""                   insert overwrite table {db_name}.brfd_ascs_kmeans_out_2 partition(inc_day='{inc_day}')                   select *                   from temp_table_2                   """.format(db_name=db_name,inc_day=inc_day)
               )
# 
目录
相关文章
|
1月前
|
存储 Python
Python中如何读取和写入文件?请提供代码示例。
【2月更文挑战第26天】【2月更文挑战第87篇】Python中如何读取和写入文件?请提供代码示例。
|
1月前
|
Python
Python中的继承:概念、用法与示例
Python中的继承:概念、用法与示例
22 0
|
3月前
|
消息中间件 Kubernetes NoSQL
python 函数操作示例
python 函数操作示例
|
3月前
|
数据可视化 Python
python数据可视化 - matplotlib专题:带数据标签的双batch的Bar图绘制示例
python数据可视化 - matplotlib专题:带数据标签的双batch的Bar图绘制示例
36 0
|
15天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
1月前
|
算法 索引 Python
Python中如何实现二分查找?请提供代码示例。
Python中如何实现二分查找?请提供代码示例。
23 0
|
1月前
|
存储 机器学习/深度学习 数据挖掘
Python编程语言:基础知识与实用代码示例
本文将带您走进Python编程世界,介绍Python的基础知识,并通过实用代码示例展示Python的魅力和应用。
35 0
|
2天前
|
机器学习/深度学习 人工智能 算法
机械视觉:原理、应用及Python代码示例
机械视觉:原理、应用及Python代码示例
|
2月前
|
机器学习/深度学习 算法 Python
Python异或运算符示例
Python异或运算符示例
48 0
|
3月前
|
Java 数据库 Python
python flask 简单示例
python flask 简单示例
26 2