使用 Flink 实时发现最热 Github 项目实验手册|Flink-Learning 实战营

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 加入 Flink-Learning 实战营,动手体验真实有趣的实战场景。只需 2 小时,让您变身 Flink 实战派。实战营采取了 Flink 专家在线授课,专属社群答疑,小松鼠助教全程陪伴的学习模式。

作者|王洪顺(弘舜) 

实验简介

通过 Flink 对 GitHub 的实时事件流进行分析,并通过报表直观展示,了解 GitHub 的最新热门趋势、特定仓库或者组织的活跃度。

体验此场景后,可以对 Flink SQL 基础能力和 Flink 实时处理特性有直观的初步体验。

实验资源

本场景使用到的实验资源和配置如下:

阿里云实时计算 Flink 版

配置项 规格
Task Manger 个数 4 个
Task Manager CPU 2 核心
Task Manager Memory 8 GiB
Job Manager CPU 1 核
Job Manager Memory 2 GiB

体验目标

对 Flink SQL 基础能力和 Flink 实时处理特性有直观的初步体验。

背景知识

GitHub 公开数据集(GitHub Archive)是 GitHub 提供的一个开放数据集合,它包含了每个公共仓库的事件数据,例如提交、拉取请求、问题和评论等。GitHub 公开数据集的数据可以用于进行各种类型的研究和分析,例如开源社区的协作情况、开发者的行为特征、编程语言的发展趋势等。使开发者们更好地了解 GitHub 上的活动和趋势,并从中获得有价值的信息和洞察。

本实验将 GitHub 公开数据集实时同步到 SLS 作为数据源,根据 Flink 对数据进行多种维度的分析并且通过报表直观展示。

前置知识

  • 了解 Flink 相关的基础知识。
  • 了解 Flink SQL 相关的基础知识。

环境搭建

创建 Session 集群。进入阿里云控制台,选择实时计算 Flink 版。然后选择已经购买的工作空间。

1

在开始 VVP 作业编写前,需要先创建 Session 集群,只有创建了 Flink 集群,才能执行任务。

1.点击系统管理 -> Session 集群 ->创建 Session

2

2.创建 Session 集群时设置为 SQL Preview 集群,这样无需设置 Sink, 即可将 Select 语句的结果输出成图表的格式。

3

实验 1:Github 关注数排行榜

本实验统计从一周前起的 Github 关注度排行榜。

操作

1.作业 SQL 代码。其中 startTime 尽量设置为当前此刻的一周前附近,如果设置的时间太早,前面无效计算时间比较长,不仅耗费资源,而且很久才能加载出计算结果。根据不同的地域设置相应的 project 和endPoint,如实例为上海的服务平台,因此设置'project' = 'github-events-shanghai'和'endPoint' = 'https://cn-shanghai-intranet.log.aliyuncs.com',其他地域如北京、杭州、深圳更改为对应值即可。

-- 通过DDL语句创建SLS源表,SLS中存放了Github的实时数据。
CREATE TEMPORARY TABLE gh_event(
  id STRING,                                        -- 每个事件的唯一ID。
  created_at BIGINT,                                -- 事件时间,单位秒。
  created_at_ts as TO_TIMESTAMP(created_at*1000),   -- 事件时间戳(当前会话时区下的时间戳,如:Asia/Shanghai)。
  type STRING,                                      -- Github事件类型,如:。ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。
  actor_id STRING,                                  -- Github用户ID。
  actor_login STRING,                               -- Github用户名。
  repo_id STRING,                                   -- Github仓库ID。
  repo_name STRING,                                 -- Github仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。
  org STRING,                                       -- Github组织ID。
  org_login STRING                                 -- Github组织名,如: apache,google,alibaba等。
) WITH (
  'connector' = 'sls',                              -- 实时采集的Github事件存放在阿里云SLS中。
  'project' = 'github-events-shanghai',                     -- 存放公开数据的SLS项目。例如'github-events-hangzhou'。
  'endPoint' = 'https://cn-shanghai-intranet.log.aliyuncs.com',                   -- 公开数据仅限VVP通过私网地址访问。例如'https://cn-hangzhou-intranet.log.aliyuncs.com'。
  'logStore' = 'realtime-github-events',            -- 存放公开数据的SLS logStore。
  'accessId' =  ' ',         -- 只读账号的AK。
  'accessKey' = ' ',   -- 只读账号的SK。
  'batchGetSize' = '500',                           -- 批量读取数据,每批最多拉取500条。
  'startTime' = '2023-06-01 14:00:00'              -- 开始时间,尽量设置到需要计算的时间附近,否则无效计算的时间较长。默认值为当前值
);

-- 配置开启mini-batch, 每2s处理一次。
SET 'table.exec.mini-batch.enabled'='true'; 
SET 'table.exec.mini-batch.allow-latency'='2s'; 
SET 'table.exec.mini-batch.size'='4096'; 

-- 作业设置4个并发,聚合更快。
SET 'parallelism.default' = '4';


-- 查看Github新增star数Top 5仓库。
SELECT DATE_FORMAT(created_at_ts, 'yyyy-MM-dd') as `date`, repo_name, COUNT(*) as num
FROM gh_event WHERE type = 'WatchEvent' 
GROUP BY DATE_FORMAT(created_at_ts, 'yyyy-MM-dd'), repo_name
ORDER BY num DESC
LIMIT 5;

2.验证 SQL 是否正确并且执行

4

3.配置图表

a. 选择 Y Bar 并且编辑标题栏为 Top 5

5

b. 配置 group by repo_name, order by num,即根据 repo_name 分组比较数量

6

c. 实验可以一直运行,不断消费最新的数据。但是如果当前集群的 CPU 数配置的较少,不足以执行两个任务,又想执行下一个实验是,可以将本实验停止。点击结果左侧的红色方框即可。

7

结果

8

第一名:s0md3v/roop 视频换脸(最近我在b站也经常翻到)

第二名:pengzhile/pandora 潘多拉实现了网页版 ChatGPT 的主要操作

第三名:ClassmateLin/dm-ticket 大麦网抢票(疫情放开,估计上周演唱会很多)

第四名:ShishirPatil/gorilla 连接海量 API 的大型语言模型

第五名: iperov/DeepFaceLive 换脸

由此可见最近一周最流行的 repo 就是 ai 视频换脸和大模型,最流行的领域就是 ai

实验 2:统计组织活跃度变化

本实验统计 apache 和 alibaba 组织开源在从 24 小时前开始的活跃度趋势变化。

操作

1.SQL 代码如下。其中 startTime 尽量设置为当前此刻的 24 小时前附近,如果设置的时间太早,前面无效计算时间比较长,不仅耗费资源,而且很久才能加载出计算结果。如果想要统计 alibaba, 改成 org_login ='alibaba' 即可

CREATE TEMPORARY TABLE gh_event(
  id STRING,                                        -- 每个事件的唯一ID。
  created_at BIGINT,                                -- 事件时间,单位秒。
  created_at_ts as TO_TIMESTAMP(created_at*1000),   -- 事件时间戳(当前会话时区下的时间戳,如:Asia/Shanghai)。
  type STRING,                                      -- Github事件类型,如:。ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。
  actor_id STRING,                                  -- Github用户ID。
  actor_login STRING,                               -- Github用户名。
  repo_id STRING,                                   -- Github仓库ID。
  repo_name STRING,                                 -- Github仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。
  org STRING,                                       -- Github组织ID。
  org_login STRING                                 -- Github组织名,如: apache,google,alibaba等。
) WITH (
  'connector' = 'sls',                              -- 实时采集的Github事件存放在阿里云SLS中。
  'project' = 'github-events-shanghai',                     -- 存放公开数据的SLS项目。例如'github-events-hangzhou'。
  'endPoint' = 'https://cn-shanghai-intranet.log.aliyuncs.com',                   -- 公开数据仅限VVP通过私网地址访问。例如'https://cn-hangzhou-intranet.log.aliyuncs.com'。
  'logStore' = 'realtime-github-events',            -- 存放公开数据的SLS logStore。
  'accessId' =  ' ',         -- 只读账号的AK。
  'accessKey' = ' ',   -- 只读账号的SK。
  'batchGetSize' = '500',                           -- 批量读取数据,每批最多拉取500条。
  'startTime' = '2023-06-07 14:00:00'               -- 开始时间,尽量设置到需要计算的时间附近,否则无效计算的时间较长
);

-- 配置开启mini-batch, 每2s处理一次。
SET 'table.exec.mini-batch.enabled'='true';
SET 'table.exec.mini-batch.allow-latency'='2s';
SET 'table.exec.mini-batch.size'='4096';

-- 作业设置4个并发,聚合更快。
SET 'parallelism.default' = '4';


-- 从一天前开始统计事件总量
SELECT NOW(), max(created_at_ts) as created_ts, COUNT(*) as event_count
FROM gh_event 
WHERE  org_login ='apache' and
created_at_ts >= NOW() - INTERVAL '1' DAY;

2.点击执行,并且配置图表

a. 点击图表配置

9

b. 编辑标题为"apache",并且选择 X/Y Line

10

c. 配置 X 轴为 create_ts, y 轴为 event_count

11

12

执行结果

13

14

apache 作为全球性的开源组织,一天内的活跃度比较均匀,而阿里巴巴开源基本由国内开发者关注和贡献,夜间增加比较平缓,在 9 点之后明显提升。

实验 3: 统计仓库贡献时间分布情况

本实验统计 flink 和 spark 开源仓库在从一周前前开始的贡献分布情况。贡献包括代码提交、commit 评论、issue 评论、提交 PR 请求、PR 请求的审查评论等与开发者相关的事件。

1.作业 SQL 代码。其中 startTime 尽量设置为当前此刻的一周前附近,如果设置的时间太早,前面无效计算时间比较长,不仅耗费资源,而且很久才能加载出计算结果。如果想要统计 spark, 改成 repo_name = 'apache/flink'' 即可。

CREATE TEMPORARY TABLE gh_event(
    id STRING,                                        -- 每个事件的唯一ID。
    created_at BIGINT,                                -- 事件时间,单位秒。
    created_at_ts as TO_TIMESTAMP(created_at*1000),   -- 事件时间戳(当前会话时区下的时间戳,如:Asia/Shanghai)。
    type STRING,                                      -- Github事件类型,如:。ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。
    actor_id STRING,                                  -- Github用户ID。
    actor_login STRING,                               -- Github用户名。
    repo_id STRING,                                   -- Github仓库ID。
    repo_name STRING,                                 -- Github仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。
    org STRING,                                       -- Github组织ID。
    org_login STRING                                 -- Github组织名,如: apache,google,alibaba等。
) WITH (
  'connector' = 'sls',                              -- 实时采集的Github事件存放在阿里云SLS中。
  'project' = 'github-events-shanghai',                     -- 存放公开数据的SLS项目。例如'github-events-hangzhou'。
  'endPoint' = 'https://cn-shanghai-intranet.log.aliyuncs.com',                   -- 公开数据仅限VVP通过私网地址访问。例如'https://cn-hangzhou-intranet.log.aliyuncs.com'。
  'logStore' = 'realtime-github-events',            -- 存放公开数据的SLS logStore。
  'accessId' =  ' ',         -- 只读账号的AK。
  'accessKey' = ' ',   -- 只读账号的SK。
  'batchGetSize' = '500',                           -- 批量读取数据,每批最多拉取500条。
  'startTime' = '2023-06-01 14:00:00'              -- 开始时间,尽量设置到需要计算的时间附近,否则无效计算的时间较长
);

-- 配置开启mini-batch, 每2s处理一次。
SET 'table.exec.mini-batch.enabled'='true';
SET 'table.exec.mini-batch.allow-latency'='2s';
SET 'table.exec.mini-batch.size'='4096';

-- 作业设置4个并发,聚合更快。
SET 'parallelism.default' = '4';


-- 配置开启mini-batch, 每2s处理一次。
SET 'table.exec.mini-batch.enabled'='true'; 
SET 'table.exec.mini-batch.allow-latency'='2s'; 
SET 'table.exec.mini-batch.size'='4096'; 

-- 作业设置4个并发,聚合更快。
SET 'parallelism.default' = '4';

-- 统计从上周起的贡献量
SELECT  DATE_FORMAT(created_at_ts, 'yyyy-MM-dd') as comment_date, HOUR(created_at_ts) AS comment_hour ,COUNT(*) AS comment_count
FROM gh_event
WHERE created_at_ts >= NOW() - INTERVAL '7' DAY 
        AND repo_name = 'apache/flink'
       AND (type ='CommitCommentEvent' OR 
            type='IssueCommentEvent' or 
            type = 'PullRequestReviewCommentEvent'or 
            type = 'PushEvent' or 
            type = 'PullRequestEvent' or 
            type = 'PullRequestReviewEvent')
GROUP BY DATE_FORMAT(created_at_ts, 'yyyy-MM-dd'), HOUR(created_at_ts) ;

2.点击执行,并且配置图表。选择 Heatmap, 设置 Group by comment_date, Spli By comment_hour,Color为 Sum(comment_count), 即 X 轴为天,Y 周为小时,根据总数量显示颜色深浅。

15


实战营涉及的其他产品简介

可观测监控 Prometheus 版作为兼容可观测事实标准 – Prometheus 开源项
目的全托管服务。默认集成 Grafana 看板与智能告警功能。一键观测主流云
服务、自建组件/集群,覆盖业务监控/应用层监控/中间件监控/系统层监
控。全面优化探针性能与系统可用性,用户无需关注系统可用性与 Exporter
自研集成。帮助企业快速搭建一站式指标可观测体系。

负载均衡SLB是云原生时代应用高可用的基本要素,是阿里云官方云原生网关。SLB支持对4层、7层业务流量转发处理,通过将流量分发到不同的后端服务来扩展应用系统的服务吞吐能力,通过健康检查和故障自动隔离机制来消除单点故障并提升应用系统的可用性。SLB提供全托管式在线负载均衡服务,具有即开即用、超大容量、稳定可靠、弹性伸缩、按需付费等特点,适合大规模、高并发、高可用场景。


加入 Flink-Learning 实战营,动手体验真实有趣的实战场景。只需 2 小时,让您变身 Flink 实战派。实战营采取了 Flink 专家在线授课,专属社群答疑,助教全程陪伴的学习模式。

入营立享权益


更多内容

img


活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
0 元试用 实时计算 Flink 版(5000CU*小时,3 个月内)
了解活动详情:https://free.aliyun.com/?pipCode=sc

image.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
6月前
|
设计模式 Java 数据库
持续霸榜GitHub的面试神器:字节跳动Java面试参考手册,限时开源
最近又赶上跳槽的高峰期(招聘旺季),好多读者都问我有没有面试字节的神器,我苦苦寻到了一份内部资料《2023字节跳动Java面试参考手册(第二期)》。
|
2月前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
147 3
|
2月前
|
存储 SQL Oracle
flink cdc 时区问题之文档添加参数无效如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
数据可视化 Java Maven
爆赞!GitHub上首本IntelliJ IDEA操作手册,标星果然百万名不虚传
还记得刚开始工作的时候使用的是Eclipse,后面是当时公司第一批尝鲜IDEA的人。刚开始用起来其实蛮麻烦的,因为最开始还是带着Eclipse的思维。 比如在Eclipse中一个workspace中可以有多个project,但是在IDEA中就没有workspace的概念了,取而代之的是project,一个project中可以有多个module。 已经不止N次的被读者问到有没有IDEA的教程,其实我觉得这就是一个工具,无非就是一个熟能生巧的过程。在N + 1次被问到的时候,我觉得有必要肝一份使用手册了! 我也去搜了一下,发现确实没有一个完整的系列教程,就算有也都是两三年前的版本了。
|
4月前
实时发现GitHub最热项目
实时发现GitHub最热项目
|
5月前
|
Java 关系型数据库 MySQL
太牛了! GitHub大牛呕心沥血整理的5000页Java学习手册文档
今天整理了一套 5000 页的 Java 学习手册,,新鲜出炉,分享给大家!此手册内容专注 Java技术,包括 JavaWeb,SSM,Linux,Spring Boot,MyBatis,MySQL,Nginx,Git,GitHub,Servlet,IDEA,多线程,集合,JVM,DeBug, Dubbo,Redis,算法,面试题等相关内容。
|
5月前
|
NoSQL Java Redis
阿里P8熬了一个月肝出这份32W字Java面试手册,在Github标星31K+
互联网行业竞争越来越严峻,面试也是越来越难,一直以来我都想整理一套完美的面试宝典,奈何难抽出时间,这套1000+道的Java面试手册我整理了整整1个月,上传到Git上目前star数达到了30K+
|
5月前
|
安全 Java 程序员
火爆全网的Spring Security手册及源码笔记,在Github上标星103K
Spring Security 是一个基于 Spring AOP 和 Servlet 过滤器的安全框架,它提供了安全性方面的解决方案
|
5月前
|
缓存 NoSQL 中间件
重磅来袭!腾讯T7手写高并发实战手册,称霸GitHub热榜
Netty为何这么火 Netty是互联网中间件领域使用最广泛、最核心的网络通信框架之一。几乎所有互联网中间件或者大数据领域均离不开Netty,掌握Netty是作为一名初中级工程师迈向高级工程师重要的技能之一。 Netty之所以受青睐,是因为Netty提供异步的、事件驱动的网络应用程序框架和工具。作为一个异步框架,Netty的所有IO操作都是异步非阻塞的,通过FutureListener机制,用户可以方便地主动获取或者通过通知机制获得IO操作结果。 为什么写Netty? 目前来说,主要的互联网公司,例如阿里、腾讯、美团、新浪、淘宝等,在高级工程师的面试过程中,就经常会问一些高
|
5月前
|
存储 网络安全 对象存储
使用Flink实时发现Github最热项目之免费试用开通教程文档
使用Flink实时发现Github最热项目之免费试用开通教程文档
136 1

相关产品

  • 实时计算 Flink版