2021年最新最全Flink系列教程_Flink快速入门(概述,安装部署)(一)(JianYi收藏)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 2021年最新最全Flink系列教程_Flink快速入门(概述,安装部署)(一)(JianYi收藏)

引言

大家好,我是ChinaManor,直译过来就是中国码农的意思,我希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,平凡但不甘于平庸的人。

下面为大家带来阿里巴巴极度热推的Flink,实时数仓是未来的方向,学好Flink,月薪过万不是梦!!

相关教程直通车:

2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二)

2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二.五)

2021年最新最全Flink系列教程__Flink高级API(三)

day01_Flink概述、安装部署和入门案例

今日目标

  • Flink概述(了解)
  • Flink安装部署(会部署)
  • Flink入门案例(会操作)

Flink概述

什么是批处理和流处理

  • 批处理,基于周期的数据一批批处理(数据采集、数据ETL、数据统计分析挖掘、报表展示)
  • 流处理,实时的来一条处理一条。

为什么需要流计算

  • 流处理应用场景
  • 实时监控
  • 实时大屏、实时分析
  • 实时数据仓库

Flink的发展史

  • 2009年柏林工业大学一个研究项目
  • 2014年被贡献给 apache 成为顶级项目,Flink 计算的主流方向是流式处理
  • 2019年flink 商业公司被阿里收购,Flink 迎来了快速的发展

Flink的官方介绍

  • Flink 是 Java 开发的,通信机制使用 akka ,数据的交换是 netty
  • Flink 推荐使用 Java 、 scala 、 python

Flink组件栈

  • 部署层
    local 单机;
    集群部署(standalone 、 yarn 、mesos、k8s);
    云部署 (阿里云、腾讯云、亚马逊云等)
  • 运行层 runtime
    StreamingGraph 流图
    jobGraph
    ExecuteGraph
  • API
    DataSet api (软弃用) ,高版本中 全部弃用
    DataStream API
  • 类库
    FlinkML Gelly(图计算)

Flink 中批处理是流处理的一种特例。

Flink基石

  • 检查点 checkpoint 轻量级
  • 状态 state keyedstate operatorstate
  • 时间 time EventTime(业务时间) ingestion time(摄取时间) processing time(处理时间)
  • 窗口 windows 滚动时间、滑动时间、会话窗口、计数窗口

Flink的应用场景

  • 常用的应用

  • 应用三个场景
  1. stream pipeline 流管线
  2. 批/流分析
  3. 基于事件驱动

Flink的安装部署

Local本地安装



  • 启动 flink
[root@node1 bin]$ start-cluster.sh

  • Flink web UI页面
node1:8081
  • 执行 wordcount 脚本
[root@node1 flink]$ bin/flink run /opt/server/flink/examples/batch/WordCount.jar --input /root/words.txt

Standalone独立集群安装


Standalone-HA高可用集群模式

  • zookeeper 选举
  • HDFS

  • 配置
  1. node1 上配置 jobmanager.rpc node2上配置 jobmanager.rpc
  2. zookeeper 的地址
  3. checkpoint 的hdfs的路径
  4. zookeeper 恢复的数据在hdfs上的路径
  5. workers 每一行一个hostname
  • 启动
  1. 启动 hdfs
hadoop-daemon start namenode
hadoop-daemon start datanode
hadoop-daemon start secondarynamenode
  1. zookeeper ,每一台都启动,集群模式
/export/server/zookeeper/bin/zkServer.sh start
  1. Flink启动
start-cluster.sh

Flink on Yarn模式

  • yarn 的两种提交方式
  1. yarn-session
    对于大量的小文件集合处理数据使用 yarn-session
  2. per-job-session
    对于大的数据量 per-job
  • yarn-session 启动并执行任务
  1. 启动 yarn-session
/export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
  1. 执行任务在 yarn-session 中
/export/server/flink/bin/flink run  /export/server/flink/examples/batch/WordCount.jar
  • kill 掉 yarn-session
yarn application -kill application_1620078252622_0006
  • per-job 启动并执行任务 [ -m yarn-cluster ]
/export/server/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /export/server/flink/examples/batch/WordCount.jar

千亿数据仓库实时项目

  • 实时通过大屏或者看板展示订单相关信息
  • 技术架构
  1. 数据源 MySQL、日志数据
  2. 日志采集工具Flume、CDC工具Canal(binlog日志变化)
  3. 消息队列 Kafka,数据仓库分层,ODS、DWD、DWS层,时间不受限
  4. 流式计算引擎 Flink
  5. 内存(缓存)数据库Redis ,保存维度数据
  6. 明细数据落到Hbase
  7. 建索引和SQL查询Phoenix
  8. 经过ETL或业务分析统计写回Kafka
  9. 时序数据库Druid加载Kafka中数据进行业务的统计
  10. 报表展示Superset或者echarts图表工具

Flink入门案例

Flink API

编程模型

  • source
  • transformation
  • sink

批处理案例

package cn.itcast.sz22.day01;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
 * Author itcast
 * Date 2021/5/4 15:25
 * 通过 flink 批处理实现 wordcount
 * 开发步骤:
 * 1. 获取环境变量
 * 2. 读取数据源
 * 3. 转换操作
 * 4. 将数据落地,打印到控制台
 * 5. 执行(流环境下)
 */
public class Wordcount {
    public static void main(String[] args) throws Exception {
        //获取环境变量
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //读取数据
        //1. 文件中读取
        //2. 获取本地的数据,开发测试用
        DataSource<String> source = env
                .fromElements("itcast hadoop spark", "itcast hadoop spark", "itcast hadoop", "itcast");
        //3. 进行转换操作
        DataSet<String> flatMapDS = source.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(word);
                }
            }
        });
        //3.2 转换成元素 map
        MapOperator<String, Tuple2<String, Integer>> mapDS = flatMapDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return Tuple2.of(value, 1);
            }
        });
        //3.3统计
        AggregateOperator<Tuple2<String, Integer>> result = mapDS.groupBy(0).sum(1);
        //4.打印输出
        result.print();
    }
}

流处理案例

package cn.itcast.sz22.day01;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
 * Author itcast
 * Date 2021/5/4 15:55
 * 流式计算 wordcount 统计
 *  编码步骤
 *  1.准备环境-env
 *  2.准备数据-source
 *  3.处理数据-transformation
 *  4.输出结果-sink
 *  5.触发执行-execute
 */
public class Wordcount2 {
    public static void main(String[] args) throws Exception {
        //编码步骤
        //1.准备环境-env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.setParallelism(1);
        //2.准备数据-source
        DataStream<String> linesDS = env
                .fromElements("itcast hadoop spark","itcast hadoop spark","itcast hadoop","itcast");
        //3.处理数据-transformation
        SingleOutputStreamOperator<String> flatMap = linesDS.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(word);
                }
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapDS = flatMap.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return Tuple2.of(value, 1);
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = mapDS.keyBy(t -> t.f0)
                .sum(1);
        //4.输出结果-sink
        result.print();
        //5.触发执行-execute
        env.execute();
    }
}

流处理-Lambda版本

package cn.itcast.sz22.day01;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
 * Author itcast
 * Date 2021/5/4 16:05
 * 通过 java 的 lambda 表达式实现 wordcount
 */
public class Wordcount3 {
    public static void main(String[] args) throws Exception {
        //1. 获取环境变量
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2. 读取数据源
        DataStreamSource<String> source = env.fromElements("itcast hadoop spark", "itcast hadoop spark", "itcast hadoop", "itcast");
        //3. 转换操作
        // void flatMap(T value, Collector<O> out)
        DataStream<String> faltMapDS = source.flatMap((String value, Collector<String> out) ->
                Arrays.stream(value.split(" "))
                        .forEach(out::collect))
                .returns(Types.STRING);
        //O map(T value)
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapDS = faltMapDS
                .map((word) -> Tuple2.of(word, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT));
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = mapDS.keyBy(t -> t.f0).sum(1);
        //4. 将数据落地,打印到控制台
        result.print();
        //5. 执行(流环境下)
        env.execute();
    }
}

Flink原理初探

  • Flink的角色分配
  1. jobmanager
  2. taskmanager
  3. client
  • taskmanager 执行能力
  1. taskslot 静态的概念
  2. parallelism 并行度 动态概念
  • 每个节点就是一个 task 任务
    每个任务拆分成多个并行处理的任务,就叫子任务 subtask
  • 流图 StreamGraph 逻辑执行流图 DataFlow
    operator chain 操作链
  • JobGraph
    ExecuteGraph 物理执行计划
  • Event 事件 带有时间戳的
  • Operator 传递模式 : one to one 模式, redistributing模式

  • Flink之执行图

总结

以上便是2021年最新最全Flink系列教程_Flink概述、安装部署和入门案例(一)

愿你读过之后有自己的收获,如果有收获不妨一键三连一下~



相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
分布式计算 资源调度 大数据
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(一)
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(一)
111 0
|
3月前
|
分布式计算 资源调度 大数据
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(二)
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(二)
101 0
|
3月前
|
存储 分布式计算 API
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
144 0
|
5月前
|
容灾 流计算
美团 Flink 大作业部署问题之 Checkpoint 跨机房副本的制作能力如何实现
美团 Flink 大作业部署问题之 Checkpoint 跨机房副本的制作能力如何实现
|
3月前
|
Kubernetes Cloud Native 流计算
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
105 3
|
3月前
|
资源调度 分布式计算 大数据
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
141 0
|
5月前
|
容灾 流计算
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
|
5月前
|
SQL Kubernetes 数据处理
实时计算 Flink版产品使用问题之如何把集群通过kubernetes进行部署
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
机器学习/深度学习 人工智能 运维
美团 Flink 大作业部署问题之Flink在生态技术演进上有什么主要方向
美团 Flink 大作业部署问题之Flink在生态技术演进上有什么主要方向
|
5月前
|
监控 Serverless Apache
美团 Flink 大作业部署问题之如何体现Flink在业界的影响力
美团 Flink 大作业部署问题之如何体现Flink在业界的影响力