一、实验数据
链接:https://pan.baidu.com/s/1CSgRQ8OXe50ya_DwkLX8yw
提取码:kexq
二、实验背景
1.1 实验背景
(1)电力设备在线监测 电力设备在线监测指在不停电的情况下,对电力设备状况进行连续或周期性地自动监视检测,使用的技术包括:传感器技术、广域通信技术和信息处理技术。 电力设备在线监测是实现电力设备状态运行检修管理、提升生产运行管理精益化水平的重要手段,对提升电网智能化水平、实现电力设备状态运行管理具有积极而深远的意义。
(2) 监测大数据
智能电网建设的推进,智能化电力一次设备和常规电力设备在线监测得到了较大发展并成为趋势,监测数据变得日益庞大,逐渐构成电力设备监测大数据,这给电力设备在线监测系统在数据存储和处理方面带来非常大的技术挑战。
电力设备监测大数据具有体量大、类型多、价值密度低和处理速度快的特点。电网公司监测系统目前过于依赖集中式SAN存储,并基于SOA进行数据集成,主要采用“企业级关系型数据库”,受容量、扩展性以及访问速度的制约,目前只存储二次加工的“熟数据”,而所擅长的关联查询、事务处理在数据分析时又无用武之地,迫切需要新的大数据存储和处理技术来应对。
(3)典型的大数据计算场景
批量计算:其计算特点是定时发生、批量处理、数据量大、实时性差。批量计算的工具包括:阿里云MaxCompute、Hadoop MapReduce等。
在线计算:其计算特点是用户触发、频繁交互、快速响应。在线计算工具有:阿里云分析型数据库等。
流式计算:其计算特点是消息触发、单条处理、实时计算。流式计算工具有:阿里云流计算、Apache Storm等。
(4)局部放电相位分析
局部放电相位分析(phase resolved partial discharge, PRPD)将多个工频周期内监测所得的局部放电参数(放电次数N、视在放电量Q或放电幅值,及放电所在相位φ)折算到一个工频周期内,计算其统计规律性,获取放电谱图,统计放电特征,用于模式识别。
PD信号分析主要包括三个子过程:1)基本参数n-q-φ的提取。扫描PD信号,统计信号中的放电峰值和相应的放电相位。2)谱图构造和统计特征计算。划分相窗,统计平均放电量和放电次数的分布,计算平均放电量相位分布谱图;3)放电类型的识别。
偏斜度Sk反映了谱图形状相对于正态分布形状的偏斜程度
陡峭度Ku反映了谱图形状相对于正态分布形状的突起程度
(5) MaxCompute 大数据计算服务(MaxCompute,原ODPS) 是阿里云提供的海量数据处理平台。主要服务于批量结构化数据的存储和计算,数据规模达EB级别。MaxCompute目前已在大型互联网企业的数据仓库和BI分析、网站的日志分析、电子商务网站的交易分析等领域得到大规模应用。MaxCompute中的数据是按照project、table、partition来组织的。
1)Project(项目空间):
Project是MaxCompute的基本组织单元,它类似于传统数据库的 Database或Schema的概念,是进行多用户隔离和访问控制的主要边界。一个用户可以同时拥有多个项目空间的权限,通过安全授权,可以在一个项目空间中访问另一个项目空间中的对象,例如:表(Table),资源(Resource),函数(Function)和实例(Instance)。
2)Table(表)
表是MaxCompute的数据存储单元,它在逻辑上也是由行和列组成的二维结构,每行代表一条记录,每列表示相同数据类型的一个字段,一条记录可以包含一个或多个列,各个列的名称和类型构成这张表的Schema。
3)Partition(分区)
在创建表时指定分区空间,即指定表内的某几个字段作为分区列。可以将分区类比为文件系统下的目录。MaxCompute 将分区列的每个值作为一个分区(目录),用户可以指定多级分区,即将表的多个字段作为表的分区,分区之间正如多级目录的关系。使用数据时,如果指定需要访问的分区名称,则只会读取相应的分区,可避免全表扫描,提高处理效率,降低费用。
4)Task(任务)
Task是MaxCompute的基本计算单元。SQL及MapReduce功能都是通过任务完成的。Task包括:计算型任务:SQL DML语句,MapReduce;非计算型任务:SQL中的DDL(无执行计划)。
5)Instance(实例)
在MaxCompute中,部分任务在执行时会被实例化,以MaxCompute实例(下文简称实例或Instance)的形式存在。实例会经历运行(Running)及结束(Terminated)两个阶段。
(6)odpscmd
odpscmd是一个Java程序,可以以命令方式访问MaxCompute。应用该客户端,可以完成包括数据查询、数据上传、下载等各种任务。需要JRE环境才能运行,请下载并安装JRE 1.6+版本。
(7)MapReduce
MapReduce最早是由Google提出的分布式数据处理模型,随后受到了业内的广泛关注,并被大量应用到各种商业场景中。比如搜索、Web访问日志分析、文本统计分析、海量数据挖掘、机器学习、自然语言处理、广告推荐等。
MapReduce处理数据过程主要分成2个阶段:Map阶段和Reduce阶段。首先执行Map阶段,再执行Reduce阶段。Map和Reduce的处理逻辑由用户自定义实现,但要符合MapReduce框架的约定。
二、实验步骤
2.1 申请阿里云MaxCompute
2.2 创建项目,建表和数据上传
登录管理控制台,浏览MaxCompute项目
根据实验分配的资源,使用分配的账号和密码登录阿里云管理控制台。
进入阿里云管理控制台,并从左侧导航栏选择“大数据(数加)-->DataWorks”,进入DataWorks首頁面。在这里能够看到已经创建好的MaxCompute项目。
安装配置odpscmd
在本地准备好JRE环境,请下载并安装JRE 1.6+版本。
从阿里云官网下载odpscmd工具: https://github.com/aliyun/aliyun-odps-console/releases?spm=a2c4g.11186623.2.15.ed2fa95ecaTeNe
解压缩,并配置<ODPS_CLIENT>/conf/odps_config.ini
project_name=[project_name] access_id=****************** access_key=********************* end_point=http://service.odps.aliyun.com/api tunnel_endpoint=http://dt.odps.aliyun.com log_view_host=http://logview.odps.aliyun.comhttps_check=true
- 建表并添加分区 (1)创建放电谱图表DW_PT
在odpscmd中,执行下面的SQL语句,建表。
create table if not exists DW_PT( PID string, ---’谱图ID’ WID bigint, ---’窗口ID’ TotalQ bigint, ---’总放电量’ AvgQ double, ---‘平均放电量’ Nums bigint, ---‘放电次数’ MaxV bigint) ---‘放电峰值’ partitioned by (DeviceID string);
当出现“ok”,表示建表成功,可以使用“ls tables;”命令查看已经创建的表。
为DW_PT表添加分区。
alter table DW_PT add if not exists partition(DeviceID='001'); 可以使用“show partitions DW_PT;”验证添加的分区是否成功。
(2) 创建统计特征表
在odpscmd中,执行下面的SQL语句,建表。 CREATE TABLE IF NOT EXISTS DW_TJ ( PID STRING, -- -’谱图ID’ SKN DOUBLE, -- -’放电次数偏斜度’ SKQ DOUBLE, -- -’放电量偏斜度’ KUN DOUBLE, -- -’ 放电次数陡峭度’ KUQ DOUBLE -- -’ 放电量陡峭度’ ) PARTITIONED BY ( DeviceID STRING ); 添加分区 alter table DW_TJ add if not exists partition(DeviceID='001');
使用Tunnel进行数据上传 本次实验会使用谱图数据(DW_PT)进行计算,得到统计特征数据(DW_TJ)。用户可以直接将实验提供的谱图数据上传,并用于计算。谱图数据文件是附件中的pt.csv。
在odpscmd中运行tunnel命令,将本地数据文件pt.csv上传至DW_PT表。下面的命令中的路径,请在执行时根据实际路径进行修改。
tunnel upload d:/Clouder/jfdata/pt.csv DW_PT/deviceid=’001’;
2.3 MapReduce程序开发、本地调试和运行
本地开发环境准备 本次实验使用Eclipse作为开发环境,请提前下载并安装。
官网导航中找到并下载 ODPS for eclipse 插件,并将插件解压并复制到Eclipse安装目录下的plugins子目录下。启动Eclipse,检查Wizard选项里面是否有ODPS的目录。
ODPS for eclipse 插件下载地址:
当可以创建ODPS类型的项目时,表示本地开发环境已经准备好了。
MapReduce程序开发
在Eclipse中创建ODPS项目,命名为TJ。为了让Eclipse能正确访问MaxCompute,需要在创建项目的时候正确配置odpscmd的本地路径。
依次添加Mapper类、Reducer类、MapReduce Driver类和R类。
TJMapper.java代码如下:
import java.io.IOException; import com.aliyun.odps.data.Record; import com.aliyun.odps.mapred.MapperBase; public class TJMapper extends MapperBase { private Record val; private Record k; @Override public void setup(TaskContext context) throws IOException { k=context.createMapOutputKeyRecord(); val=context.createMapOutputValueRecord(); } @Override public void map(long recordNum, Record record, TaskContext context) throws IOException { String pid= record.getString("pid"); k.set(new Object[] {pid}); val=record; context.write(k,val); } @Override public void cleanup(TaskContext context) throws IOException { } }
TJReducer.java代码如下:
import java.io.IOException; import java.util.Iterator; import java.util.*; import com.aliyun.odps.data.Record; import com.aliyun.odps.mapred.ReducerBase; public class TJReducer extends ReducerBase { private Record result; private long alln=0;//总放电次数 private long allq=0;//总放电量 private List<R> recs; @Override public void setup(TaskContext context) throws IOException { result=context.createOutputRecord(); recs=new ArrayList<R>(); } @Override public void reduce(Record key, Iterator<Record> values, TaskContext context)throws IOException { float a_n=0;//次数均值 float a_q=0;//放电量均值 float f_n=0;//次数方差 float f_q=0;//放电量方差 while (values.hasNext()) { Record temp=values.next(); R r=new R(); r.nums=temp.getBigint("nums"); r.avgq=temp.getDouble("avgq"); r.totalq=temp.getBigint("totalq"); r.max=temp.getBigint("maxv"); r.wid=temp.getBigint("wid"); recs.add(r); alln=alln+temp.getBigint("nums"); allq=allq+temp.getBigint("totalq"); } //均值计算 for(int i=0;i<recs.size();i++) { recs.get(i).n_p=(float)recs.get(i).nums/alln; recs.get(i).q_p=(float)recs.get(i).totalq/allq; a_n=a_n+recs.get(i).n_p*recs.get(i).wid; a_q=a_q+recs.get(i).q_p*recs.get(i).wid; } //方差计算 for(int i=0;i<recs.size();i++) { float a=recs.get(i).n_p*(float)Math.pow(recs.get(i).wid-a_n, 2); f_n=f_n+a; float b=recs.get(i).q_p*(float)Math.pow(recs.get(i).wid-a_q, 2); f_q=f_q+b; } f_n=(float)Math.sqrt((double)f_n); f_q=(float)Math.sqrt((double)f_q); //sk计算公式的分子 float ln=0; float lq=0; //计算sk for(int i=0;i<recs.size();i++) { float a=recs.get(i).n_p*(float)Math.pow(recs.get(i).wid-a_n, 3); ln=ln+a; float b=recs.get(i).q_p*(float)Math.pow(recs.get(i).wid-a_q, 3); lq=lq+b; } double sk_n=ln/Math.pow(f_n, 3); double sk_q=lq/Math.pow(f_q, 3); //添加结果集 result.set("pid", key.getString(0)); result.set("skn", sk_n); result.set("skq", sk_q); //重置临时累计变量 ln=0; lq=0; //计算正半轴ku for(int i=0;i<recs.size();i++) { float a=recs.get(i).n_p*(float)Math.pow(recs.get(i).wid-a_n, 4); ln=ln+a; float b=recs.get(i).q_p*(float)Math.pow(recs.get(i).wid-a_q, 4); lq=lq+b; } double ku_n=ln/Math.pow(f_n, 4)-3; double ku_q=lq/Math.pow(f_q, 4)-3; //添加结果集 result.set("kun", ku_n); result.set("kuq", ku_q); context.write(result); } @Override public void cleanup(TaskContext context) throws IOException { } }
TJDriver.java代码如下:
import com.aliyun.odps.OdpsException; import com.aliyun.odps.data.TableInfo; import com.aliyun.odps.mapred.JobClient; import com.aliyun.odps.mapred.RunningJob; import com.aliyun.odps.mapred.conf.JobConf; import com.aliyun.odps.mapred.utils.InputUtils; import com.aliyun.odps.mapred.utils.OutputUtils; import com.aliyun.odps.mapred.utils.SchemaUtils; public class TJDriver { public static void main(String[] args) throws OdpsException { JobConf job = new JobConf(); // TODO: specify map output types job.setMapOutputKeySchema(SchemaUtils.fromString("pid:string")); job.setMapOutputValueSchema(SchemaUtils.fromString("pid:string,wid:bigint,totalq:bigint,avgq:double,nums:bigint,maxv:bigint")); // TODO: specify input and output tables InputUtils.addTable(TableInfo.builder().tableName(args[0]).partSpec("deviceid=001").build(),job); OutputUtils.addTable(TableInfo.builder().tableName(args[1]).partSpec("deviceid=001").build(),job); // TODO: specify a mapper job.setMapperClass(TJMapper.class); // TODO: specify a reducer job.setReducerClass(TJReducer.class); RunningJob rj = JobClient.runJob(job); rj.waitForCompletion(); } }
R.java的代码如下:
public class R { public long wid;//窗ID public long totalq;//一个窗的总放电量 public double avgq;//一个窗的平均放电量 public long nums;//一个窗的放电次数 public long max;//一个窗的的放电最大值 public float n_p; //一个窗的的放电次数概率 public float q_p;//一个窗的的放电量概率 public R(){ } }
本地测试
打开TJDriver.java,右击“Run as-àRun Configurations”
在ODPS Config选项卡,选择正确的ODPS项目。
在Arguments选项卡中,输入运行参数:dw_pt dw_tj,并点击“Run”,执行本地测试运行。
在第一次运行时,Eclipse会从MaxCompute中下载少量的测试数据用于测试。运行完成后,可以在Warehouse中看到测试用的输入数据和产生的结果数据。
打包并上传资源 在本地测试结果正确之后,就可以导出jar包了。在Eclipse下执行“FileàExport”,选择导出“JAR File”,导出至本地。
在odpscmd下,执行添加资源的命令,将jar上传至MaxCompute。
add jar d:/Clouder/jar/TJ.jar;
MaxCompute上执行程序 在odpscmd下,执行jar命令,运行程序。 jar -resources TJ.jar -classpath d:\Clouder\jar\TJ.jar TJDriver dw_pt dw_tj;
课后作业
3.1 课后作业 传统的MapReduce模型要求每一轮MapReduce操作之后,数据必须落地到分布式文件系统上(比如HDFS或 MaxCompute 表)。而一般的MapReduce应用通常由多个MapReduce作业组成,每个作业结束之后需要写入磁盘,接下去的Map任务很多情况下只是读一遍数据,为后续的Shuffle阶段做准备,这样其实造成了冗余的IO操作。MaxCompute 的计算调度逻辑可以支持更复杂编程模型,针对上面的那种情况,可以在Reduce后面直接执行下一次的Reduce操作,而不需要中间插入一个Map操作。基于此,MaxCompute 提供了扩展的MapReduce模型,即可以支持Map后连接任意多个Reduce操作,比如Map->Reduce->Reduce。
本次课程的课后作业,要求将谱图计算过程和统计特征的计算过程合并,使用Map——Reduce——Reduce的方式实现,一次读取表,中间结果保持在内存中,直接产生dw_tj结果表。可以使用pipeline对多次Reduce进行连接。
MapReduce Driver的参考代码如下:
import com.aliyun.odps.OdpsException; import com.aliyun.odps.OdpsType; import com.aliyun.odps.data.TableInfo; import com.aliyun.odps.pipeline.Pipeline; import com.aliyun.odps.Column; import com.aliyun.odps.mapred.Job; public class TJDriver { public static void main(String[] args) throws OdpsException { Job job = new Job(); Pipeline pipeline = Pipeline.builder() .addMapper(PTMapper.class) .setOutputKeySchema( new Column[] { new Column("wid", OdpsType.BIGINT) }) .setOutputValueSchema( new Column[] { new Column("PID", OdpsType.STRING), new Column("wid", OdpsType.BIGINT), new Column("maxv", OdpsType.BIGINT) }) .setOutputKeySortColumns(new String[] { "wid" }) .setPartitionColumns(new String[] { "wid" }) .setOutputGroupingColumns(new String[] { "wid" }) .addReducer(PTRduecer.class) .setOutputKeySchema( new Column[] { new Column("pid", OdpsType.STRING) }) .setOutputValueSchema( new Column[] { new Column("wid", OdpsType.BIGINT), new Column("totalq", OdpsType.BIGINT), new Column("avgq", OdpsType.BIGINT), new Column("nums", OdpsType.BIGINT), new Column("maxv", OdpsType.BIGINT)}) .addReducer(TJReducer.class).createPipeline(); // TODO: specify map output types job.setPipeline(pipeline); job.addInput(TableInfo.builder().tableName(args[0]).build()); job.addOutput(TableInfo.builder().tableName(args[1]).build()); job.submit(); job.waitForCompletion(); System.exit(job.isSuccessful() == true ? 0 : 1); } }