Flink的这些事(二)——Flink开发环境搭建

本文涉及的产品
RDS AI 助手,专业版
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
简介: Flink开发环境搭建

IEDA开发环境

1、安装java环境

参考上一篇文章Flink的这些事(一)——Flink部署

2、安装maven

参考博客Maven安装与配置

3、配置IDEA

参考博客如何使用IntelliJ IDEA 配置Maven

4、pom文件设置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>flink</groupId>
    <artifactId>flink-dev</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <hadoop.version>2.7.6</hadoop.version>
        <flink.version>1.6.1</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.22</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <!-- <arg>-make:transitive</arg> -->
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.apache.spark.WordCount</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

5、代码示例


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * Author: qincf
 * Date: 2018/11/02
 * Desc: 使用flink对指定窗口内的数据进行实时统计,最终把结果打印出来
 *       先在目标主机1.1.1.1机器上执行nc -l 9000
 */
public class StreamingWindowWordCount {
    public static void main(String[] args) throws Exception {
        //定义socket的端口号
        int port;
        try{
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        }catch (Exception e){
            System.err.println("没有指定port参数,使用默认值9000");
            port = 9000;
        }
        //获取运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //连接socket获取输入的数据
        DataStreamSource<String> text = env.socketTextStream("1.1.1.1", port, "\n");
        //计算数据
        DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                String[] splits = value.split("\\s");
                for (String word:splits) {
                    out.collect(new WordWithCount(word,1L));
                }
            }
        })//打平操作,把每行的单词转为<word,count>类型的数据
                //针对相同的word数据进行分组
                .keyBy("word")
                //指定计算数据的窗口大小和滑动窗口大小
                .timeWindow(Time.seconds(2),Time.seconds(1))
                .sum("count");
        //获取可视化JSON
        System.out.println(env.getExecutionPlan());
        //把数据打印到控制台,使用一个并行度
        windowCount.print().setParallelism(1);
        //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
        env.execute("streaming word count");



    }

    /**
     * 主要为了存储单词以及单词出现的次数
     */
    public static class WordWithCount{
        public String word;
        public long count;
        public WordWithCount(){}
        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }

}

6、测试步骤

首先在1.1.1.1机器上使用nc命令模拟数据发送

nc -l 9000

然后在IEDA中运营StreamingWindowWordCount程序
在主机上输入字符

[root@data01]# nc -l 9000
a
a
b
c
d
d

此时运行程序后,IDEA中会打印处结果

E:\tools\Java\bin\java.exe "-javaagent:E:\tools\IDEA\IntelliJ IDEA Community Edition 2018.2.5\lib\idea_rt.jar=61830:E:\tools\IDEA\IntelliJ IDEA Community Edition 2018.2.5\bin" -Dfile.encoding=UTF-8 -classpath E:\tools\Java\jre\lib\charsets.jar;E:\tools\Java\jre\lib\deploy.jar;E:\tools\Java\jre\lib\ext\access-bridge-64.jar;E:\tools\Java\jre\lib\ext\cldrdata.jar;E:\tools\Java\jre\lib\ext\dnsns.jar;E:\tools\Java\jre\lib\ext\jaccess.jar;E:\tools\Java\jre\lib\ext\jfxrt.jar;E:\tools\Java\jre\lib\ext\localedata.jar;E:\tools\Java\jre\lib\ext\nashorn.jar;E:\tools\Java\jre\lib\ext\sunec.jar;E:\tools\Java\jre\lib\ext\sunjce_provider.jar;E:\tools\Java\jre\lib\ext\sunmscapi.jar;E:\tools\Java\jre\lib\ext\sunpkcs11.jar;E:\tools\Java\jre\lib\ext\zipfs.jar;E:\tools\Java\jre\lib\javaws.jar;E:\tools\Java\jre\lib\jce.jar;E:\tools\Java\jre\lib\jfr.jar;E:\tools\Java\jre\lib\jfxswt.jar;E:\tools\Java\jre\lib\jsse.jar;E:\tools\Java\jre\lib\management-agent.jar;E:\tools\Java\jre\lib\plugin.jar;E:\tools\Java\jre\lib\resources.jar;E:\tools\Java\jre\lib\rt.jar;E:\code\flink\target\classes;E:\tools\Maven-Repository\org\scala-lang\scala-library\2.11.12\scala-library-2.11.12.jar;E:\tools\Maven-Repository\org\apache\flink\flink-java\1.6.1\flink-java-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-core\1.6.1\flink-core-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-annotations\1.6.1\flink-annotations-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-metrics-core\1.6.1\flink-metrics-core-1.6.1.jar;E:\tools\Maven-Repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;E:\tools\Maven-Repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;E:\tools\Maven-Repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;E:\tools\Maven-Repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;E:\tools\Maven-Repository\org\apache\commons\commons-compress\1.4.1\commons-compress-1.4.1.jar;E:\tools\Maven-Repository\org\tukaani\xz\1.0\xz-1.0.jar;E:\tools\Maven-Repository\org\apache\flink\flink-shaded-asm\5.0.4-4.0\flink-shaded-asm-5.0.4-4.0.jar;E:\tools\Maven-Repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;E:\tools\Maven-Repository\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;E:\tools\Maven-Repository\org\slf4j\slf4j-api\1.7.7\slf4j-api-1.7.7.jar;E:\tools\Maven-Repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;E:\tools\Maven-Repository\org\apache\flink\force-shading\1.6.1\force-shading-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-streaming-java_2.11\1.6.1\flink-streaming-java_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-runtime_2.11\1.6.1\flink-runtime_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-queryable-state-client-java_2.11\1.6.1\flink-queryable-state-client-java_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-hadoop-fs\1.6.1\flink-hadoop-fs-1.6.1.jar;E:\tools\Maven-Repository\commons-io\commons-io\2.4\commons-io-2.4.jar;E:\tools\Maven-Repository\org\apache\flink\flink-shaded-netty\4.1.24.Final-4.0\flink-shaded-netty-4.1.24.Final-4.0.jar;E:\tools\Maven-Repository\org\apache\flink\flink-shaded-jackson\2.7.9-4.0\flink-shaded-jackson-2.7.9-4.0.jar;E:\tools\Maven-Repository\org\javassist\javassist\3.19.0-GA\javassist-3.19.0-GA.jar;E:\tools\Maven-Repository\com\typesafe\akka\akka-actor_2.11\2.4.20\akka-actor_2.11-2.4.20.jar;E:\tools\Maven-Repository\com\typesafe\config\1.3.0\config-1.3.0.jar;E:\tools\Maven-Repository\org\scala-lang\modules\scala-java8-compat_2.11\0.7.0\scala-java8-compat_2.11-0.7.0.jar;E:\tools\Maven-Repository\com\typesafe\akka\akka-stream_2.11\2.4.20\akka-stream_2.11-2.4.20.jar;E:\tools\Maven-Repository\org\reactivestreams\reactive-streams\1.0.0\reactive-streams-1.0.0.jar;E:\tools\Maven-Repository\com\typesafe\ssl-config-core_2.11\0.2.1\ssl-config-core_2.11-0.2.1.jar;E:\tools\Maven-Repository\com\typesafe\akka\akka-protobuf_2.11\2.4.20\akka-protobuf_2.11-2.4.20.jar;E:\tools\Maven-Repository\com\typesafe\akka\akka-slf4j_2.11\2.4.20\akka-slf4j_2.11-2.4.20.jar;E:\tools\Maven-Repository\org\clapper\grizzled-slf4j_2.11\1.0.2\grizzled-slf4j_2.11-1.0.2.jar;E:\tools\Maven-Repository\com\github\scopt\scopt_2.11\3.5.0\scopt_2.11-3.5.0.jar;E:\tools\Maven-Repository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;E:\tools\Maven-Repository\com\twitter\chill_2.11\0.7.4\chill_2.11-0.7.4.jar;E:\tools\Maven-Repository\com\twitter\chill-java\0.7.4\chill-java-0.7.4.jar;E:\tools\Maven-Repository\org\apache\flink\flink-shaded-guava\18.0-4.0\flink-shaded-guava-18.0-4.0.jar;E:\tools\Maven-Repository\org\apache\flink\flink-scala_2.11\1.6.1\flink-scala_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\scala-lang\scala-reflect\2.11.12\scala-reflect-2.11.12.jar;E:\tools\Maven-Repository\org\scala-lang\scala-compiler\2.11.12\scala-compiler-2.11.12.jar;E:\tools\Maven-Repository\org\scala-lang\modules\scala-xml_2.11\1.0.5\scala-xml_2.11-1.0.5.jar;E:\tools\Maven-Repository\org\scala-lang\modules\scala-parser-combinators_2.11\1.0.4\scala-parser-combinators_2.11-1.0.4.jar;E:\tools\Maven-Repository\org\apache\flink\flink-streaming-scala_2.11\1.6.1\flink-streaming-scala_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-table_2.11\1.6.1\flink-table_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-clients_2.11\1.6.1\flink-clients_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-optimizer_2.11\1.6.1\flink-optimizer_2.11-1.6.1.jar;E:\tools\Maven-Repository\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-connector-kafka-0.10_2.11\1.6.1\flink-connector-kafka-0.10_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-connector-kafka-0.9_2.11\1.6.1\flink-connector-kafka-0.9_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-connector-kafka-base_2.11\1.6.1\flink-connector-kafka-base_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\kafka\kafka-clients\0.10.2.1\kafka-clients-0.10.2.1.jar;E:\tools\Maven-Repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-client\2.7.6\hadoop-client-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-common\2.7.6\hadoop-common-2.7.6.jar;E:\tools\Maven-Repository\com\google\guava\guava\11.0.2\guava-11.0.2.jar;E:\tools\Maven-Repository\xmlenc\xmlenc\0.52\xmlenc-0.52.jar;E:\tools\Maven-Repository\commons-httpclient\commons-httpclient\3.1\commons-httpclient-3.1.jar;E:\tools\Maven-Repository\commons-codec\commons-codec\1.4\commons-codec-1.4.jar;E:\tools\Maven-Repository\commons-net\commons-net\3.1\commons-net-3.1.jar;E:\tools\Maven-Repository\org\mortbay\jetty\jetty-sslengine\6.1.26\jetty-sslengine-6.1.26.jar;E:\tools\Maven-Repository\javax\servlet\jsp\jsp-api\2.1\jsp-api-2.1.jar;E:\tools\Maven-Repository\commons-logging\commons-logging\1.1.3\commons-logging-1.1.3.jar;E:\tools\Maven-Repository\log4j\log4j\1.2.17\log4j-1.2.17.jar;E:\tools\Maven-Repository\commons-lang\commons-lang\2.6\commons-lang-2.6.jar;E:\tools\Maven-Repository\commons-configuration\commons-configuration\1.6\commons-configuration-1.6.jar;E:\tools\Maven-Repository\commons-digester\commons-digester\1.8\commons-digester-1.8.jar;E:\tools\Maven-Repository\commons-beanutils\commons-beanutils\1.7.0\commons-beanutils-1.7.0.jar;E:\tools\Maven-Repository\commons-beanutils\commons-beanutils-core\1.8.0\commons-beanutils-core-1.8.0.jar;E:\tools\Maven-Repository\org\slf4j\slf4j-log4j12\1.7.10\slf4j-log4j12-1.7.10.jar;E:\tools\Maven-Repository\org\codehaus\jackson\jackson-core-asl\1.9.13\jackson-core-asl-1.9.13.jar;E:\tools\Maven-Repository\org\codehaus\jackson\jackson-mapper-asl\1.9.13\jackson-mapper-asl-1.9.13.jar;E:\tools\Maven-Repository\org\apache\avro\avro\1.7.4\avro-1.7.4.jar;E:\tools\Maven-Repository\com\thoughtworks\paranamer\paranamer\2.3\paranamer-2.3.jar;E:\tools\Maven-Repository\com\google\protobuf\protobuf-java\2.5.0\protobuf-java-2.5.0.jar;E:\tools\Maven-Repository\com\google\code\gson\gson\2.2.4\gson-2.2.4.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-auth\2.7.6\hadoop-auth-2.7.6.jar;E:\tools\Maven-Repository\org\apache\httpcomponents\httpclient\4.2.5\httpclient-4.2.5.jar;E:\tools\Maven-Repository\org\apache\httpcomponents\httpcore\4.2.4\httpcore-4.2.4.jar;E:\tools\Maven-Repository\org\apache\directory\server\apacheds-kerberos-codec\2.0.0-M15\apacheds-kerberos-codec-2.0.0-M15.jar;E:\tools\Maven-Repository\org\apache\directory\server\apacheds-i18n\2.0.0-M15\apacheds-i18n-2.0.0-M15.jar;E:\tools\Maven-Repository\org\apache\directory\api\api-asn1-api\1.0.0-M20\api-asn1-api-1.0.0-M20.jar;E:\tools\Maven-Repository\org\apache\directory\api\api-util\1.0.0-M20\api-util-1.0.0-M20.jar;E:\tools\Maven-Repository\org\apache\curator\curator-framework\2.7.1\curator-framework-2.7.1.jar;E:\tools\Maven-Repository\org\apache\curator\curator-client\2.7.1\curator-client-2.7.1.jar;E:\tools\Maven-Repository\org\apache\curator\curator-recipes\2.7.1\curator-recipes-2.7.1.jar;E:\tools\Maven-Repository\org\apache\htrace\htrace-core\3.1.0-incubating\htrace-core-3.1.0-incubating.jar;E:\tools\Maven-Repository\org\apache\zookeeper\zookeeper\3.4.6\zookeeper-3.4.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-hdfs\2.7.6\hadoop-hdfs-2.7.6.jar;E:\tools\Maven-Repository\org\mortbay\jetty\jetty-util\6.1.26\jetty-util-6.1.26.jar;E:\tools\Maven-Repository\io\netty\netty\3.6.2.Final\netty-3.6.2.Final.jar;E:\tools\Maven-Repository\io\netty\netty-all\4.0.23.Final\netty-all-4.0.23.Final.jar;E:\tools\Maven-Repository\xerces\xercesImpl\2.9.1\xercesImpl-2.9.1.jar;E:\tools\Maven-Repository\xml-apis\xml-apis\1.3.04\xml-apis-1.3.04.jar;E:\tools\Maven-Repository\org\fusesource\leveldbjni\leveldbjni-all\1.8\leveldbjni-all-1.8.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-mapreduce-client-app\2.7.6\hadoop-mapreduce-client-app-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-mapreduce-client-common\2.7.6\hadoop-mapreduce-client-common-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-yarn-client\2.7.6\hadoop-yarn-client-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-yarn-server-common\2.7.6\hadoop-yarn-server-common-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-mapreduce-client-shuffle\2.7.6\hadoop-mapreduce-client-shuffle-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-yarn-api\2.7.6\hadoop-yarn-api-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-mapreduce-client-core\2.7.6\hadoop-mapreduce-client-core-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-yarn-common\2.7.6\hadoop-yarn-common-2.7.6.jar;E:\tools\Maven-Repository\javax\xml\bind\jaxb-api\2.2.2\jaxb-api-2.2.2.jar;E:\tools\Maven-Repository\javax\xml\stream\stax-api\1.0-2\stax-api-1.0-2.jar;E:\tools\Maven-Repository\javax\activation\activation\1.1\activation-1.1.jar;E:\tools\Maven-Repository\javax\servlet\servlet-api\2.5\servlet-api-2.5.jar;E:\tools\Maven-Repository\com\sun\jersey\jersey-core\1.9\jersey-core-1.9.jar;E:\tools\Maven-Repository\com\sun\jersey\jersey-client\1.9\jersey-client-1.9.jar;E:\tools\Maven-Repository\org\codehaus\jackson\jackson-jaxrs\1.9.13\jackson-jaxrs-1.9.13.jar;E:\tools\Maven-Repository\org\codehaus\jackson\jackson-xc\1.9.13\jackson-xc-1.9.13.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-mapreduce-client-jobclient\2.7.6\hadoop-mapreduce-client-jobclient-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-annotations\2.7.6\hadoop-annotations-2.7.6.jar;E:\tools\Maven-Repository\mysql\mysql-connector-java\5.1.38\mysql-connector-java-5.1.38.jar;E:\tools\Maven-Repository\com\alibaba\fastjson\1.2.22\fastjson-1.2.22.jar StreamingWindowWordCount
没有指定port参数,使用默认值9000
{"nodes":[{"id":1,"type":"Source: Socket Stream","pact":"Data Source","contents":"Source: Socket Stream","parallelism":1},{"id":2,"type":"Flat Map","pact":"Operator","contents":"Flat Map","parallelism":8,"predecessors":[{"id":1,"ship_strategy":"REBALANCE","side":"second"}]},{"id":4,"type":"Window(SlidingProcessingTimeWindows(2000, 1000), ProcessingTimeTrigger, SumAggregator, PassThroughWindowFunction)","pact":"Operator","contents":"Window(SlidingProcessingTimeWindows(2000, 1000), ProcessingTimeTrigger, SumAggregator, PassThroughWindowFunction)","parallelism":8,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"}]}]}
WordWithCount{word='a', count=1}
WordWithCount{word='a', count=2}
WordWithCount{word='b', count=1}
WordWithCount{word='d', count=1}
WordWithCount{word='c', count=1}
WordWithCount{word='c', count=1}
WordWithCount{word='a', count=1}
WordWithCount{word='d', count=1}
WordWithCount{word='b', count=1}

大家会看到,wordcount的结果。
仔细看还有一串json输出,这部分是什么呢?
代码中加了一个打印执行计划的部分:

/获取可视化JSON
System.out.println(env.getExecutionPlan());

Flink提供了一个可视化执行计划的结果,类似Spark的DAG图,把json粘贴到Flink Plan Visualizer可以看到执行计划图:
image.png

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
流计算 资源调度 Java
Apache Flink 零基础入门(二):开发环境搭建和应用的配置、部署及运行
本文主要面向于初次接触 Flink、或者对 Flink 有了解但是没有实际操作过的同学。希望帮助大家更顺利地上手使用 Flink,并着手相关开发调试工作。
Apache Flink 零基础入门(二):开发环境搭建和应用的配置、部署及运行
|
6月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
660 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
4125 74
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
zdl
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
655 56
|
消息中间件 关系型数据库 MySQL
Flink CDC 在阿里云实时计算Flink版的云上实践
本文整理自阿里云高级开发工程师阮航在Flink Forward Asia 2024的分享,重点介绍了Flink CDC与实时计算Flink的集成、CDC YAML的核心功能及应用场景。主要内容包括:Flink CDC的发展及其在流批数据处理中的作用;CDC YAML支持的同步链路、Transform和Route功能、丰富的监控指标;典型应用场景如整库同步、Binlog原始数据同步、分库分表同步等;并通过两个Demo展示了MySQL整库同步到Paimon和Binlog同步到Kafka的过程。最后,介绍了未来规划,如脏数据处理、数据限流及扩展数据源支持。
834 0
Flink CDC 在阿里云实时计算Flink版的云上实践
|
存储 关系型数据库 BI
实时计算UniFlow:Flink+Paimon构建流批一体实时湖仓
实时计算架构中,传统湖仓架构在数据流量管控和应用场景支持上表现良好,但在实际运营中常忽略细节,导致新问题。为解决这些问题,提出了流批一体的实时计算湖仓架构——UniFlow。该架构通过统一的流批计算引擎、存储格式(如Paimon)和Flink CDC工具,简化开发流程,降低成本,并确保数据一致性和实时性。UniFlow还引入了Flink Materialized Table,实现了声明式ETL,优化了调度和执行模式,使用户能灵活调整新鲜度与成本。最终,UniFlow不仅提高了开发和运维效率,还提供了更实时的数据支持,满足业务决策需求。
|
人工智能 Apache 流计算
Flink Forward Asia 2024 上海站|探索实时计算新边界
Flink Forward Asia 2024 即将盛大开幕!11 月 29 至 30 日在上海举行,大会聚焦 Apache Flink 技术演进与未来规划,涵盖流式湖仓、流批一体、Data+AI 融合等前沿话题,提供近百场专业演讲。立即报名,共襄盛举!官网:https://asia.flink-forward.org/shanghai-2024/
1455 33
Flink Forward Asia 2024 上海站|探索实时计算新边界

热门文章

最新文章