一、介绍
Alink是基于Flink的通用算法平台。
1.1 数据聚类介绍
1.可以定义为5组数据类型的特征字段名称:
sepal_length double, sepal_width double, petal_length double, petal_width double, category string
2.控制特征字段为petal_width
3.聚合类型为category
3.主要特征控制为:
sepal_length double, sepal_width double, petal_length double, petal_width double
二、Java API
2.1 Alink-KMeans.java
public class AlinkDemo { public static void main(String[] args) throws Exception { String URL = "https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/iris.csv"; String SCHEMA_STR = "sepal_length double, sepal_width double, petal_length double, petal_width double, category string"; BatchOperator data = new CsvSourceBatchOp() .setFilePath(URL) .setSchemaStr(SCHEMA_STR); VectorAssembler va = new VectorAssembler() .setSelectedCols(new String[]{"sepal_length", "sepal_width", "petal_length", "petal_width"}) .setOutputCol("features"); KMeans kMeans = new KMeans().setVectorCol("features").setK(3) .setPredictionCol("prediction_result") .setPredictionDetailCol("prediction_detail") .setReservedCols("category") .setMaxIter(100); Pipeline pipeline = new Pipeline().add(va).add(kMeans); pipeline.fit(data).transform(data).print(); } }
2.2 pom
<dependency> <groupId>com.alibaba.alink</groupId> <artifactId>alink_core_flink-1.9_2.11</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>1.9.0</version> </dependency>
2.3 打包插件
<build> <plugins> <!-- 编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <!-- scala编译插件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.1.6</version> <configuration> <scalaCompatVersion>2.11</scalaCompatVersion> <scalaVersion>2.11.12</scalaVersion> <encoding>UTF-8</encoding> </configuration> <executions> <execution> <id>compile-scala</id> <phase>compile</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>test-compile-scala</id> <phase>test-compile</phase> <goals> <goal>add-source</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <!-- 打jar包插件(会包含所有依赖) --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.6</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <!-- 可以设置jar包的入口类(可选) --> <mainClass>com.wang.flink.alink.AlinkDemo</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
三、 本地执行聚类结果
四、上传Flink集群
4.1 执行 打包 mvn -package
4.2 上传至Linux
sftp> put D:/APP/IDEA/workplace/FlinkTurbineFaultDiagnosis/target/Flink-TurbineFaultDiagnosis-1
4.3 运行FLink执行
bin/flink run -p 1 -c com.wang.flink.alink.AlinkDemo /root/Flink-TurbineFaultDiagnosis-1.0-SNAPSHOT-jar-with-dependencies.jar
4.4 Flink集群概览
访问集群节点:http://202.206.212.189:8081/