Flink x Zeppelin ,Hive Streaming 实战解析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink 1.11 正式发布已经三周了,其中最吸引我的特性就是 Hive Streaming。正巧 Zeppelin-0.9-preview2 也在前不久发布了,所以就写了一篇 Zeppelin 上的 Flink Hive Streaming 的实战解析。

作者:狄杰@蘑菇街

Flink 1.11 正式发布已经三周了,其中最吸引我的特性就是 Hive Streaming。正巧 Zeppelin-0.9-preview2 也在前不久发布了,所以就写了一篇 Zeppelin 上的 Flink Hive Streaming 的实战解析。本文主要从以下几部分跟大家分享:

  • Hive Streaming 的意义
  • Checkpoint & Dependency
  • 写入 Kafka
  • Hive Streaming Sink
  • Hive Streaming Source
  • Hive Temporal Table

Hive Streaming 的意义

很多同学可能会好奇,为什么 Flink 1.11 中,Hive Streaming 的地位这么高?它的出现,到底能给我们带来什么?

其实在大数据领域,一直存在两种架构 Lambda 和 Kappa:

  • Lambda 架构——流批分离,静态数据通过定时调度同步到 Hive 数仓,实时数据既会同步到 Hive,也会被实时计算引擎消费,这里就引出了一点问题。
  • 数据口径问题
  • 离线计算产出延时太大
  • 数据冗余存储
  • Kappa 架构——全部使用实时计算来产出数据,历史数据通过回溯消息的消费位点计算,同样也有很多的问题,毕竟没有一劳永逸的架构。
  • 消息中间件无法保留全部历史数据,同样数据都是行式存储,占用空间太大
  • 实时计算计算历史数据力不从心
  • 无法进行 Ad-Hoc 的分析

为了解决这些问题,行业内推出了实时数仓,解决了大部分痛点,但是还是有些地方力不从心。比如涉及到历史数据的计算怎么办?我想做 Ad-Hoc 的分析又怎么玩?所以行业内现在都是实时数仓与离线数仓并行存在,而这又带来了更多的问题:模型需要多份、数据产出不一致、历史数据的计算等等 。

而 Hive Streaming 的出现就可以解决这些问题!再也不用多套模型了;也不需要同一个指标因为涉及到历史数据,写一遍实时 SQL 再写一遍离线 SQL;Ad-Hoc 也能做了,怎么做?读 Hive Streaming 产出的表就行!

接下来,让我们从参数配置开始,接着流式的写入 Hive,再到流式的读取 Hive 表,最后再 Join 上 Hive 维表吧。这一整套流程都体验后,想必大家对 Hive Streaming 一定会有更深入的了解,更能够体会到它的作用。

Checkpoint & Dependency

因为只有在完成 Checkpoint 之后,文件才会从 In-progress 状态变成 Finish 状态,所以,我们需要合理的去配置 Checkpoint,在 Zeppelin 中配置 Checkpoint 很简单。

%flink.conf

# checkpoint 配置

pipeline.time-characteristic EventTime
execution.checkpointing.interval 120000
execution.checkpointing.min-pause 60000
execution.checkpointing.timeout 60000
execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION

# 依赖jar包配置

flink.execution.packages org.apache.flink:flink-connector-kafka_2.11:1.11.0,org.apache.flink:flink-connector-kafka-base_2.11:1.11.0

又因为我们需要从 Kafka 中读取数据,所以将 Kafka 的依赖也加入进去了。

写入Kafka

我们的数据来自于天池数据集,是以 CSV 的格式存在于本地磁盘,所以需要先将他们写入 Kafka。

先建一下 CSV Source 和 Kafka Sink 的表:

%flink.ssql
SET table.sql-dialect=default;
DROP TABLE IF EXISTS source_csv;
CREATE TABLE source_csv (
user_id string,
theme_id string,
item_id string,
leaf_cate_id string,
cate_level1_id string,
clk_cnt int,
reach_time string
) WITH (
 'connector' = 'filesystem',
 'path' = 'file:///Users/dijie/Downloads/Cloud_Theme_Click/theme_click_log.csv',
 'format' = 'csv'
 
 )
%flink.ssql
SET table.sql-dialect=default;
DROP TABLE IF EXISTS kafka_table;
CREATE TABLE kafka_table (
user_id string,
theme_id string,
item_id string,
leaf_cate_id string,
cate_level1_id string,
clk_cnt int,
reach_time string,
ts AS localtimestamp,
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'theme_click_log',
'properties.bootstrap.servers' = '10.70.98.1:9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'

)

因为注册的表即可以读又可以写,于是我在建表时将 Watermark 加上了;又因为源数据中的时间戳已经很老了,所以我这里采用当前时间减去5秒作为我的 Watermark。

大家可以看到,我在语句一开始指定了 SQL 方言为 Default,这是为啥呢?还有别的方言吗?别急,听我慢慢说。

其实在之前的版本,Flink 就已经可以和 Hive 打通,包括可以把表建在 Hive 上,但是很多语法和 Hive 不兼容,包括建的表在 Hive 中也无法查看,主要原因就是方言不兼容。所以,在 Flink 1.11 中,为了减少学习成本(语法不兼容),可以用 DDL 建 Hive 表并在 Hive 中查询,Flink 支持了方言,默认的就是 Default 了,就和之前一样,如果想建 Hive 表,并支持查询,请使用 Hive 方言,具体可以参考下方链接。

Hive 方言:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_catalog.html

再把数据从 CSV 中读取后写入 Kafka。

%flink.ssql(type=update)

insert into kafka_table select * from source_csv ;

再瞄一眼 Kafka,看看数据有没有被灌进去:

kafka.jpg

看来没问题,那么接下来让我们写入 Hive。

Hive Streaming Sink

建一个Hive Sink Table,记得将方言切换到 Hive,否则会有问题。

%flink.ssql
SET table.sql-dialect=hive;
DROP TABLE IF EXISTS hive_table;
CREATE TABLE hive_table (
user_id string,
theme_id string,
item_id string,
leaf_cate_id string,
cate_level1_id string,
clk_cnt int,
reach_time string
) PARTITIONED BY (dt string, hr string, mi string) STORED AS parquet TBLPROPERTIES (

 'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
 'sink.partition-commit.trigger'='partition-time',
 'sink.partition-commit.delay'='1 min',
 'sink.partition-commit.policy.kind'='metastore,success-file'

);

参数给大家稍微解释一下:

  • partition.time-extractor.timestamp-pattern:分区时间抽取器,与 DDL 中的分区字段保持一致;
  • sink.partition-commit.trigger:分区触发器类型,可选 process-time 或partition-time。process-time:不需要上面的参数,也不需要水印,当当前时间大于分区创建时间 +sink.partition-commit.delay 中定义的时间,提交分区;partition-time:需要 Source 表中定义 watermark,当 watermark > 提取到的分区时间 +sink.partition-commit.delay 中定义的时间,提交分区;
  • sink.partition-commit.delay:相当于延时时间;
  • sink.partition-commit.policy.kind:怎么提交,一般提交成功之后,需要通知 metastore,这样 Hive 才能读到你最新分区的数据;如果需要合并小文件,也可以自定义 Class,通过实现 PartitionCommitPolicy 接口。

接下来让我们把数据插入刚刚创建的 Hive Table:

%flink.ssql

insert into hive_table select  user_id,theme_id,item_id,leaf_cate_id,cate_level1_id,clk_cnt,reach_time,DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH') ,DATE_FORMAT(ts, 'mm') from kafka_table

让程序再跑一会儿~我们先去倒一杯 95 年的 Java☕️ 。

然后再看看我们的 HDFS,看看路径下的东西。

kafka2.jpg

大家也可以用 Hive 自行查询看看,我呢就先卖个关子,一会儿用 Hive Streaming 来读数据。

Hive Streaming Source

因为 Hive 表上面已经创建过了,所以这边读数据的时候直接拿来用就行了,不同的地方是需要使用 Table Hints 去覆盖参数。

Hive Streaming Source 最大的不足是,无法读取已经读取过的分区下新增的文件。简单来说就是,读过的分区,就不会再读了。看似很坑,不过仔细想想,这样才符合流的特性。

照旧给大家说一下参数的意思:

  • stream-source.enable:显而易见,表示是否开启流模式。
  • stream-source.monitor-interval:监控新文件/分区产生的间隔。
  • stream-source.consume-order:可以选 create-time 或者 partition-time;create-time 指的不是分区创建时间,而是在 HDFS 中文件/文件夹的创建时间;partition-time 指的是分区的时间;对于非分区表,只能用 create-time。官网这边的介绍写的有点模糊,会让人误以为可以查到已经读过的分区下新增的文件,其实经过我的测试和翻看源码发现并不能。
  • stream-source.consume-start-offset:表示从哪个分区开始读。

光说不干假把式,让我们捞一把数据看看~

kafka3.jpg

SET 那一行得带着,不然无法使用 Table Hints。

Hive Temporal Table

看完了 Streaming Source 和 Streaming Sink,让我们最后再试一下 Hive 作为维表吧。

其实用 Hive 维表很简单,只要是在 Hive 中存在的表,都可以当做维表使用,参数完全可以用 Table Hints 来覆盖。

  • lookup.join.cache.ttl:表示缓存时间;这里值得注意的是,因为 Hive 维表会把维表所有数据缓存在 TM 的内存中,如果维表量很大,那么很容易就 OOM;如果 ttl 时间太短,那么会频繁的加载数据,性能会有很大影响。

kafka4.jpg

因为是 LEFT JOIN,所以维表中不存在的数据会以 NULL 补全。

再看一眼 DAG 图:

kafka5.jpg

大家看一下画框的地方,能看到这边是使用的维表关联 LookupJoin。

如果大家 SQL 语句写错了,丢了 for system_time as of a.p,那么 DAG 图就会变成这样:

kafka6.jpg

这种就不是维表 JOIN 其实更像是流和批在 JOIN。

写在最后

Hive Streaming 的完善意味着打通了流批一体的最后一道壁垒,既可以做到历史数据的 OLAP 分析,又可以实时吐出结果,这无疑是 ETL 开发者的福音,想必接下来的日子,会有更多的企业完成他们实时数仓的建设。

参考文档:

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/
[2]https://github.com/apache/zeppelin/blob/master/docs/interpreter/flink.md

Note 下载:

https://github.com/lonelyGhostisdog/flinksql/blob/master/src/main/resources/Flink%20on%20Zeppelin/Hive%20Streaming%20Test.zpln

最后,给大家介绍一下 Flink on Zeppelin 的钉钉群,大家有问题可以在里面讨论,Apache Zeppelin PMC 简锋大佬也在里面,有问题可以直接在钉群中提问交流~

作者介绍:

狄杰,蘑菇街资深数据专家,负责蘑菇街实时计算平台 。目前 Focus 在 Flink on Zeppelin,Apache Zeppelin Contributor。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
25天前
|
存储 缓存 算法
HashMap深度解析:从原理到实战
HashMap,作为Java集合框架中的一个核心组件,以其高效的键值对存储和检索机制,在软件开发中扮演着举足轻重的角色。作为一名资深的AI工程师,深入理解HashMap的原理、历史、业务场景以及实战应用,对于提升数据处理和算法实现的效率至关重要。本文将通过手绘结构图、流程图,结合Java代码示例,全方位解析HashMap,帮助读者从理论到实践全面掌握这一关键技术。
75 13
|
21天前
|
物联网 调度 vr&ar
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
鸿蒙技术分享:HarmonyOS Next 深度解析 随着万物互联时代的到来,华为发布的 HarmonyOS Next 在技术架构和生态体验上实现了重大升级。本文从技术架构、生态优势和开发实践三方面深入探讨其特点,并通过跨设备笔记应用实战案例,展示其强大的分布式能力和多设备协作功能。核心亮点包括新一代微内核架构、统一开发语言 ArkTS 和多模态交互支持。开发者可借助 DevEco Studio 4.0 快速上手,体验高效、灵活的开发过程。 239个字符
200 13
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
|
20天前
|
自然语言处理 搜索推荐 数据安全/隐私保护
鸿蒙登录页面好看的样式设计-HarmonyOS应用开发实战与ArkTS代码解析【HarmonyOS 5.0(Next)】
鸿蒙登录页面设计展示了 HarmonyOS 5.0(Next)的未来美学理念,结合科技与艺术,为用户带来视觉盛宴。该页面使用 ArkTS 开发,支持个性化定制和无缝智能设备连接。代码解析涵盖了声明式 UI、状态管理、事件处理及路由导航等关键概念,帮助开发者快速上手 HarmonyOS 应用开发。通过这段代码,开发者可以了解如何构建交互式界面并实现跨设备协同工作,推动智能生态的发展。
134 10
鸿蒙登录页面好看的样式设计-HarmonyOS应用开发实战与ArkTS代码解析【HarmonyOS 5.0(Next)】
|
21小时前
|
供应链 搜索推荐 API
深度解析1688 API对电商的影响与实战应用
在全球电子商务迅猛发展的背景下,1688作为知名的B2B电商平台,为中小企业提供商品批发、分销、供应链管理等一站式服务,并通过开放的API接口,为开发者和电商企业提供数据资源和功能支持。本文将深入解析1688 API的功能(如商品搜索、详情、订单管理等)、应用场景(如商品展示、搜索优化、交易管理和用户行为分析)、收益分析(如流量增长、销售提升、库存优化和成本降低)及实际案例,帮助电商从业者提升运营效率和商业收益。
26 14
|
5天前
|
数据采集 XML API
深入解析BeautifulSoup:从sohu.com视频页面提取关键信息的实战技巧
深入解析BeautifulSoup:从sohu.com视频页面提取关键信息的实战技巧
|
11天前
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
56 14
|
16天前
|
安全 API 数据安全/隐私保护
速卖通AliExpress商品详情API接口深度解析与实战应用
速卖通(AliExpress)作为全球化电商的重要平台,提供了丰富的商品资源和便捷的购物体验。为了提升用户体验和优化商品管理,速卖通开放了API接口,其中商品详情API尤为关键。本文介绍如何获取API密钥、调用商品详情API接口,并处理API响应数据,帮助开发者和商家高效利用这些工具。通过合理规划API调用策略和确保合法合规使用,开发者可以更好地获取商品信息,优化管理和营销策略。
|
1月前
|
数据采集 DataWorks 搜索推荐
阿里云DataWorks深度评测:实战视角下的全方位解析
在数字化转型的大潮中,高效的数据处理与分析成为企业竞争的关键。本文深入评测阿里云DataWorks,从用户画像分析最佳实践、产品体验、与竞品对比及Data Studio公测体验等多角度,全面解析其功能优势与优化空间,为企业提供宝贵参考。
116 13
|
30天前
|
数据采集 存储 JavaScript
网页爬虫技术全解析:从基础到实战
在信息爆炸的时代,网页爬虫作为数据采集的重要工具,已成为数据科学家、研究人员和开发者不可或缺的技术。本文全面解析网页爬虫的基础概念、工作原理、技术栈与工具,以及实战案例,探讨其合法性与道德问题,分享爬虫设计与实现的详细步骤,介绍优化与维护的方法,应对反爬虫机制、动态内容加载等挑战,旨在帮助读者深入理解并合理运用网页爬虫技术。
|
1月前
|
存储 监控 调度
云服务器成本优化深度解析与实战案例
本文深入探讨了云服务器成本优化的策略与实践,涵盖基本原则、具体策略及案例分析。基本原则包括以实际需求为导向、动态调整资源、成本控制为核心。具体策略涉及选择合适计费模式、优化资源配置、存储与网络配置、实施资源监控与审计、应用性能优化、利用优惠政策及考虑多云策略。文章还通过电商、制造企业和初创团队的实际案例,展示了云服务器成本优化的有效性,最后展望了未来的发展趋势,包括智能化优化、多云管理和绿色节能。

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多