python调用spark示例

简介: python调用spark示例
# coding=utf-8importsysprint('sys.executable--',sys.executable)
importsklearnprint("Sklearn verion is {}".format(sklearn.__version__))
# reload(sys)# sys.setdefaultencoding("utf8")importpandasaspdimportnumpyasnpfromsklearn.linear_modelimportLinearRegressionfromdatetimeimportdate,datetime,timedeltafrompysparkimportSparkConffrompysparkimportSparkContextfrompyspark.sqlimportHiveContext,SQLContextfrompyspark.storagelevelimportStorageLevelfromsklearn.externalsimportjoblibfromsklearn.clusterimportKMeansimportpandasaspdif__name__=="__main__":
db_name=sys.argv[1] #0对应文件名data_date=sys.argv[2]
inc_day=data_date.replace('-','')
k=5iteration=1000print('db_name:{0}      data_date:{1}   inc_day:{2}'.format( db_name, data_date,inc_day))
#测试读取hive数据conf=SparkConf().setAppName("family")
sc=SparkContext(conf=conf)
hc=HiveContext(sc)
sqlContext=SQLContext(sc)
#查询a="""SELECT distinct      create_date                                as     `时间`,     depot_code                                 as     `场地编码`,     ascs_sorting_code                          as     `ascs分拣线code`,     ascs_distinct_waybill_count                as     `补码成功运单数`,     ascs_distinct_photo_count                  as     `补码成功图片数`,     ascs_has_face_count                        as     `有面单数`,     ascs_not_face_count                        as     `无面单数`,     ascs_timeout_photo_count                   as     `补码超时数`,     ascs_success_rate                          as     `补码成功率`,     ascs_pick_supply_count                     as     `分拣成功数`,     ascs_saving_hours                          as     `节约工时`,     ascs_total_no                              as     `自动化处理量`,     ascs_return_count                          as     `回流量`,     ascs_assist_nr_rate                        as     `辅助无读识别率`,     ascs_return_rate                           as     `回流挽回率`,     case when (ascs_distinct_waybill_count-ascs_pick_supply_count)>=0 then 'yes' else 'no' end  as `补码成功运单数是否大于等于分拣成功数`    FROM  dm_argus_sheduler_tmp.brfd_shizhong_buma_success_tmp_buma    where inc_day<='{inc_day}'""".format(inc_day=inc_day)
data_day=hc.sql(a)
# spark转pandasdata=data_day.toPandas()
print(data.columns)
print(data.dtypes)
print(data)
print('补码成功运单数小于分拣成功数的记录:',data[data['补码成功运单数是否大于等于分拣成功数']=='no'])
# inputfile = 'data.csv' #销量及其他属性数据# use_clos=['时间','ascs分拣线code','补码成功运单数','补码成功图片数','有面单数','无面单数',# '补码超时数','补码成功率','分拣成功数','自动化处理量','回流量','辅助无读识别率','回流挽回率'# ]# data = pd.read_csv(inputfile,usecols=use_clos) #读取数据# data=data.dropna(how='any')data=data.fillna(0)
# data=data[(data['ascs分拣线code']!=0)&(data['补码成功运单数']!=0)&(data['补码成功图片数']!=0)]data=data[(data['ascs分拣线code']!=0)]
data.set_index(['时间','ascs分拣线code'],drop=True,inplace=True)
data['补码图片数']=data['有面单数']+data['无面单数']+data['补码超时数']
data['补码超时率']=data['补码超时数']/data['补码图片数']
data['呈像系统有效性1']=data['有面单数']/(data['有面单数']+data['无面单数'])
data['呈像系统有效性2']=(data['补码成功运单数']/data['补码成功图片数'])
data['分拣环节有效性']=(data['分拣成功数']/data['补码成功运单数'])
data['辅助无读识别率']=data['辅助无读识别率']
data['回流挽回率']=data['回流挽回率']
# data=data.dropna(how='any')data=data[['自动化处理量','补码图片数','有面单数','无面单数','补码成功运单数','补码成功图片数','分拣成功数','辅助无读识别率','回流挽回率','补码成功率','补码超时率','呈像系统有效性1','呈像系统有效性2','分拣环节有效性']]
data_zs=1.0*(data-data.min())/data.max() #数据标准化data_zs=data_zs.fillna(data_zs.mean())
model=KMeans(n_clusters=k, n_jobs=4, max_iter=iteration) #分为k类,并发数4model.fit(data_zs) #开始聚类#整理输出数据#1.模型相关信息r1=pd.DataFrame(pd.Series(model.labels_).value_counts() ,columns=['各类数量'])#统计各个类别的数目r2=pd.DataFrame(model.cluster_centers_,columns=data.columns) #找出聚类中心cluster_cnt=pd.concat([r2, r1], axis=1) #横向连接(0是纵向),得到聚类中心对应的类别下的数目r2=r2.sort_values(by=['回流挽回率'],ascending=False)
new_label={ v:str(i+1)+'类'fori,vinenumerate(r2.index)}#可解释的类别映射cluster_cnt[u'聚类类别']=cluster_cnt.index.map(new_label)
cluster_cnt=cluster_cnt.sort_values(by=['聚类类别'])
cluster_cnt=cluster_cnt[['聚类类别','各类数量']]
cluster_cnt.columns=['centers_no','cnt']
print('各类数量:',cluster_cnt)
#2.聚类结果#关联可解释性类别标识作为index【入库保存】#详细输出原始数据及其类别r=pd.concat([data, pd.Series(model.labels_, index=data.index,name='聚类类别').map(new_label)], axis=1) 
#关联可解释性类别标识作为index【入库保存】r_=r.copy(deep=True)
r_=r_.reset_index()
r_=r_[r_['时间']==data_date]
r_=r_[['时间','ascs分拣线code','辅助无读识别率','回流挽回率','聚类类别']]
r_.columns=['date','ascs_code','fuzhuwudushibielv','huiliuwanhuilv','class']
print('聚类结果:',r_)
#pandas转sparkcluster_cnt=hc.createDataFrame(cluster_cnt)
r_=hc.createDataFrame(r_)
#建表hc.sql("set hive exec.dynamic.partition.mode =nonstrict")
hc.sql("set hive exec.dynamic.partition = true")
#各类数量sql_str="CREATE TABLE IF NOT EXISTS {db_name}.brfd_ascs_kmeans_out_1 (centers_no string, cnt string) partitioned BY (inc_day string) row format delimited fields terminated BY '|'".format(db_name=db_name)
hc.sql(sql_str)
#聚类结果sql_str="CREATE TABLE IF NOT EXISTS {db_name}.brfd_ascs_kmeans_out_2 (date string, ascs_code string,fuzhuwudushibielv string ,huiliuwanhuilv string, class string) partitioned BY (inc_day string) row format delimited fields terminated BY '|'".format(db_name=db_name)
hc.sql(sql_str)
#分区表可以先创建三种临时表hc.sql("use {db_name}".format(db_name=db_name))
cluster_cnt.registerTempTable("temp_table_1")
r_.registerTempTable("temp_table_2")
#插入方式三种hc.sql("""                   insert overwrite table {db_name}.brfd_ascs_kmeans_out_1 partition(inc_day='{inc_day}')                   select *                   from temp_table_1                   """.format(db_name=db_name,inc_day=inc_day)
               )
hc.sql("""                   insert overwrite table {db_name}.brfd_ascs_kmeans_out_2 partition(inc_day='{inc_day}')                   select *                   from temp_table_2                   """.format(db_name=db_name,inc_day=inc_day)
               )
# 
目录
相关文章
|
4月前
|
JSON API 数据格式
洋码头商品 API 示例指南(Python 实现)
洋码头是国内知名跨境电商平台,提供商品搜索、详情、分类等API接口。本文详解了使用Python调用这些API的流程与代码示例,涵盖签名生成、请求处理及常见问题解决方案,适用于构建选品工具、价格监控等跨境电商应用。
|
4月前
|
缓存 JSON API
VIN车辆识别码查询车五项 API 实践指南:让每一俩车有迹可循(Python代码示例)
VIN(车辆识别代码)是全球唯一的17位汽车标识码,可快速获取车架号、发动机号、品牌型号等核心信息。在二手车交易、保险理赔、维修保养等场景中,准确解析VIN有助于提升效率与风控能力。本文介绍VIN码结构、适用场景,并提供Python调用示例及优化建议,助力企业实现车辆信息自动化核验。
855 1
|
4月前
|
JSON API UED
运营商二要素验证 API:核验身份的一致性技术实践(Python示例)
随着线上业务快速发展,远程身份核验需求激增。运营商二要素验证API通过对接三大运营商实名数据,实现姓名、手机号、身份证号的一致性校验,具备权威性高、实时性强的优势,广泛应用于金融、电商、政务等领域。该接口支持高并发、低延迟调用,结合Python示例可快速集成,有效提升身份认证的安全性与效率。
547 0
|
4月前
|
JSON API 数据格式
Python采集京东商品评论API接口示例,json数据返回
下面是一个使用Python采集京东商品评论的完整示例,包括API请求、JSON数据解析
|
5月前
|
JSON 缓存 API
身份证二要素核验接口调用指南 —— Python 示例
本文介绍如何在 Python 中快速实现身份证二要素核验功能,适用于用户注册、金融风控等场景。通过阿里云市场提供的接口,可校验「姓名 + 身份证号」的一致性,并获取性别、生日、籍贯等信息。示例代码展示了从环境变量读取 APP_CODE、发送 GET 请求到解析 JSON 响应的完整流程。关键字段包括 code(1-一致,2-不一致,3-无记录)、msg 和 data。常见问题如 403 错误需检查 AppCode,超时则优化网络或设置重试机制。集成后可根据业务需求添加缓存、限流等功能提升性能。
536 4
|
8月前
|
XML JSON API
淘宝商品详情API的调用流程(python请求示例以及json数据示例返回参考)
JSON数据示例:需要提供一个结构化的示例,展示商品详情可能包含的字段,如商品标题、价格、库存、描述、图片链接、卖家信息等。考虑到稳定性,示例应基于淘宝开放平台的标准响应格式。
|
6月前
|
SQL 数据库 开发者
Python中使用Flask-SQLAlchemy对数据库的增删改查简明示例
这样我们就对Flask-SQLAlchemy进行了一次简明扼要的旅程,阐述了如何定义模型,如何创建表,以及如何进行基本的数据库操作。希望你在阅读后能对Flask-SQLAlchemy有更深入的理解,这将为你在Python世界中从事数据库相关工作提供极大的便利。
655 77
|
4月前
|
测试技术 API 开发者
淘宝关键词搜索商品列表API接入指南(含Python示例)
淘宝关键词搜索商品列表API是淘宝开放平台的核心接口,支持通过关键词检索商品,适用于比价、选品、市场分析等场景。接口提供丰富的筛选与排序功能,返回结构化数据,含商品ID、标题、价格、销量等信息。开发者可使用Python调用,需注意频率限制与错误处理,建议先在沙箱环境测试。
|
3月前
|
数据采集 索引 Python
Python Slice函数使用教程 - 详解与示例 | Python切片操作指南
Python中的`slice()`函数用于创建切片对象,以便对序列(如列表、字符串、元组)进行高效切片操作。它支持指定起始索引、结束索引和步长,提升代码可读性和灵活性。
|
4月前
|
JSON API 开发者
一号店商品 API 示例指南(Python 实现)
本教程介绍如何使用 Python 调用一号店商品 API,涵盖商品搜索、详情、分类等接口的调用方法。内容包括注册认证、签名生成、代码实现及常见问题解决方案,并提供完整示例代码,帮助开发者快速接入一号店开放平台,构建电商工具与数据分析应用。

推荐镜像

更多