1.pom文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>flink-sql</artifactId> <version>1.0-SNAPSHOT</version> <name>flink-sql</name> <!-- FIXME change it to the project's website --> <url>http://www.example.com</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.10</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.10</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table_2.10</artifactId> <version>1.2.0</version> </dependency> </dependencies> <build> <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --> <plugins> <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle --> <plugin> <artifactId>maven-clean-plugin</artifactId> <version>3.1.0</version> </plugin> <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging --> <plugin> <artifactId>maven-resources-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> </plugin> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.1</version> </plugin> <plugin> <artifactId>maven-jar-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-install-plugin</artifactId> <version>2.5.2</version> </plugin> <plugin> <artifactId>maven-deploy-plugin</artifactId> <version>2.8.2</version> </plugin> <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle --> <plugin> <artifactId>maven-site-plugin</artifactId> <version>3.7.1</version> </plugin> <plugin> <artifactId>maven-project-info-reports-plugin</artifactId> <version>3.0.0</version> </plugin> </plugins> </pluginManagement> </build> </project>
2.flink sql wordcount
package org.example; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment; import java.util.ArrayList; /** * Author : Jackson * Version : 2020/4/21 & 1.0 */ public class SqlWordCount { public static void main(String[] args) throws Exception { //创建上下文环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment fbtableEnv = BatchTableEnvironment.getTableEnvironment(env); String words = "hello Jackson hello flink"; String[] split = words.split("\\W+"); ArrayList<WC> list = new ArrayList<>(); for (String word : split) { WC wc = new WC(word, 1); list.add(wc); } DataSource<WC> input = env.fromCollection(list); //DataSet 转sql,指定字段名 Table table1 = fbtableEnv.fromDataSet(input, "word,count1"); //table1.printSchema(); //注册为一个表 fbtableEnv.registerTable("WordCount", table1); Table table2 = fbtableEnv.sql("select word as word,sum(count1) as count1 from WordCount GROUP BY word"); //将表转换DaraSet DataSet<WC> res = fbtableEnv.toDataSet(table2, WC.class); res.printToErr(); } public static class WC { public String word; public long count1; public WC() { } public WC(String word, long count1) { this.word = word; this.count1 = count1; } @Override public String toString() { return word + "," + count1; } } }
3.结果
代码:
zhugezifang/flink-sql (github.com)
参考:
Apache Flink 1.3 Documentation: SQL