初识Flink

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 初识Flink

Flink是什么

Flink 是 Apache 基金会旗下的一个开源大数据处理框架。目前,Flink 已经成为各大公司

大数据实时处理的发力重点,特别是国内以阿里为代表的一众互联网大厂都在全力投入,为

Flink 社区贡献了大量源码。如今 Flink 已被很多人认为是大数据实时处理的方向和未来,许多

公司也都在招聘和储备掌握 Flink 技术的人才。

Flink 的主要应用场景,就是处理大规模的数据流。那为什么一定要用 Flink

呢?数据处理还有没有其他的方式?

批处理和流处理

数据处理有不同的方式。

对于具体应用来说,有些场景数据是一个一个来的,是一组有序的数据序列,我们把它叫

作“数据流”;而有些场景的数据,本身就是一批同时到来,是一个有限的数据集,这就是批

量数据(有时也直接叫数据集)。

容易想到,处理数据流,当然应该“来一个就处理一个”,这种数据处理模式就叫作流处理;因为这种处理是即时的,所以也叫实时处理。与之对应,处理批量数据自然就应该一批读

入、一起计算,这种方式就叫作批处理,也叫作离线处理。

Flink 的核心特性

Flink 区别与传统数据处理框架的特性如下。

  • 高吞吐和低延迟。每秒处理数百万个事件,毫秒级延迟。
  • 结果的准确性。Flink 提供了事件时间(event-time)和处理时间(processing-time)语义。对于乱序事件流,事件时间语义仍然能提供一致且准确的结果。
  • 精确一次(exactly-once)的状态一致性保证。
  • 可以连接到最常用的存储系统,如 Apache Kafka、Apache Cassandra、Elasticsearch、JDBC、Kinesis 和(分布式)文件系统,如 HDFS 和 S3。
  • 高可用。本身高可用的设置,加上与 K8s,YARN 和Mesos 的紧密集成,再加上从故 障中快速恢复和动态扩展任务的能力,Flink 能做到以极少的停机时间 7×24 全天候 运行。
  • 能够更新应用程序代码并将作业(jobs)迁移到不同的 Flink 集群,而不会丢失应用 程序的状态。

分层 API

最底层级的抽象仅仅提供了有状态流,它将处理函数(Process Function)嵌入到了

DataStream API 中。底层处理函数(Process Function)与 DataStream API 相集成,可以对某

些操作进行抽象,它允许用户可以使用自定义状态处理来自一个或多个数据流的事件,且状态

具有一致性和容错保证。除此之外,用户可以注册事件时间并处理时间回调,从而使程序可以

处理复杂的计算。


实际上,大多数应用并不需要上述的底层抽象,而是直接针对核心 API(Core APIs) 进行编程,比如 DataStream API(用于处理有界或无界流数据)以及 DataSet API(用于处理有界数据集)。这些 API 为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换(transformations)、连接(joins)、聚合(aggregations)、窗口(windows)操作等。DataSet API 为有界数据集提供了额外的支持,例如循环与迭代。这些 API 处理的数据类型以类(classes)的形式由各自的编程语言所表示。


Table API 是以表为中心的声明式编程,其中表在表达流数据时会动态变化。Table API 遵

循关系模型:表有二维数据结构(schema)(类似于关系数据库中的表),同时 API 提供可比

较的操作,例如 select、join、group-by、aggregate 等。


尽管 Table API 可以通过多种类型的用户自定义函数(UDF)进行扩展,仍不如核心 API

更具表达能力,但是使用起来代码量更少,更加简洁。除此之外,Table API 程序在执行之前

会使用内置优化器进行优化。


我们可以在表与 DataStream/DataSet 之间无缝切换,以允许程序将 Table API 与

DataStream 以及 DataSet 混合使用。


Flink 提供的最高层级的抽象是 SQL。这一层抽象在语法与表达能力上与 Table API 类似,

但是是以 SQL 查询表达式的形式表现程序。SQL 抽象与 Table API 交互密切,同时 SQL 查询

可以直接在 Table API 定义的表上执行。


目前 Flink SQL 和 Table API 还在开发完善的过程中,很多大厂都会二次开发符合自己需

要的工具包。而 DataSet 作为批处理 API 实际应用较少,2020 年 12 月 8 日发布的新版本 1.12.0,

已经完全实现了真正的流批一体,DataSet API 已处于软性弃用(soft deprecated)的状态。用

Data Stream API 写好的一套代码, 即可以处理流数据, 也可以处理批数据,只需要设置不同的

执行模式。这与之前版本处理有界流的方式是不一样的,Flink 已专门对批处理数据做了优化

处理。本书中以介绍 DataStream API 为主,采用的是目前最新版本 Flink 1.13.0。

HelloWorld快速上手

创建maven项目添加依赖

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>
    <dependencies>
        <!-- 引入 Flink 相关依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- 引入日志管理相关依赖-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

在resources下添加log4j.properties

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

在当前项目下创建input目录,新建文件words.txt

hello world
hello flink
hello java

最原始的批处理,不建议使用

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class BatchWordCount {
    public static void main(String[] args) throws Exception{
        //创建环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //读取文件
        DataSource<String> words = env.readTextFile("input/words.txt");
        //转换为二元组
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = words.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            for (String word : line.split(" ")) {
                out.collect(Tuple2.of(word, 1l));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        //按照key进行分组
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);
        //聚合统计
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);
        //打印结果
        sum.print();
    }
}

最新api接口的批处理

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class BoundedStreamWordCount {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lineDSS = env.readTextFile("input/words.txt");
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
                .flatMap((String line, Collector<String> words) -> {
                    Arrays.stream(line.split(" ")).forEach(words::collect);
                })
                .returns(Types.STRING)
                .map(word -> Tuple2.of(word, 1L))
                .returns(Types.TUPLE(Types.STRING, Types.LONG));
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne.keyBy(t -> t.f0);
        SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS.sum(1);
        result.print();
        env.execute();
    }
}

实时流式处理,由于是试试,我们通过给端口发数据模拟实时数据

nc -lk 7777
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lineDSS = env.socketTextStream("localhost",
                7777);
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
                .flatMap((String line, Collector<String> words) -> {
                    Arrays.stream(line.split(" ")).forEach(words::collect);
                })
                .returns(Types.STRING)
                .map(word -> Tuple2.of(word, 1L))
                .returns(Types.TUPLE(Types.STRING, Types.LONG));
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne
                .keyBy(t -> t.f0);
        SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS
                .sum(1);
        System.out.println("-----------");
        result.print();
        env.execute();
    }
}

结语

感谢尚硅谷

推荐链接:https://www.bilibili.com/video/BV133411s7Sa?p=1

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
分布式计算 流计算 Spark
Flink - CountTrigger && ProcessingTimeTriger 详解
Flink 针对 window 提供了多种自定义 trigger,其中常见的有 CountTrigger 和 ProcessingTimeTrigger,下面通过两个 demo 了解一下两个 Trigger 的内部实现原理与窗口触发的相关知识。
1187 0
|
9月前
|
SQL 数据挖掘 关系型数据库
初识Flink
阿里云实时计算Flink版是一款全托管Serverless的Flink云服务,基于Apache Flink构建一站式实时大数据分析平台,提供端到端亚秒级实时数据分析能力,并通过标准SQL降低业务开发门槛,助力企业向实时化、智能化大数据【2月更文挑战第3天】
69 2
|
Java Linux 网络安全
flink快速开始
flink快速开始
57 1
|
9月前
|
流计算
Flink Exactly-Once
Flink Exactly-Once
60 0
|
SQL 存储 算法
深入解读 Flink 1.17
阿里云技术专家,Apache Flink PMC Member & Committer、Flink CDC Maintainer 徐榜江(雪尽) 在深入解读 Flink 1.17 的分享。
6700 0
深入解读 Flink 1.17
|
消息中间件 Kafka 流计算
flink的TimeCharacteristic
flink的TimeCharacteristic
164 0
|
存储 算法 测试技术
|
SQL 消息中间件 分布式计算
Flink的重要特点
Flink的重要特点
207 0
Flink的重要特点
flink
flink
94 0
|
SQL 消息中间件 分布式计算
【Flink】(一)初识 Flink
【Flink】(一)初识 Flink
203 0
【Flink】(一)初识 Flink