《Flink官方文档》Batch Examples(二)

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 连通分支 连通分支算法识别会一个更大的图,这部分图通过被相同的组件ID链接的所有顶点连接。类似PageRank,连通组件是一个迭代算法。在每个步骤中,每个顶点都将其当前组件ID传给所有邻居。如果小于自己的组件ID,一个顶点从邻居接受组件ID。

连通分支

连通分支算法识别会一个更大的图,这部分图通过被相同的组件ID链接的所有顶点连接。类似PageRank,连通组件是一个迭代算法。在每个步骤中,每个顶点都将其当前组件ID传给所有邻居。如果小于自己的组件ID,一个顶点从邻居接受组件ID。

此实现使用增量迭代:组件ID未变化的顶点不参与下一步骤。因为后来的迭代通常只处理一些离群顶点,这将产生更好的性能。

// read vertex and edge data
DataSet<Long> vertices = getVertexDataSet(env);
DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge());

// assign the initial component IDs (equal to the vertex ID)
DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());

// open a delta iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
        verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0);

// apply the step logic:
DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset()
        // join with the edges
        .join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
        // select the minimum neighbor component ID
        .groupBy(0).aggregate(Aggregations.MIN, 1)
        // update if the component ID of the candidate is smaller
        .join(iteration.getSolutionSet()).where(0).equalTo(0)
        .flatMap(new ComponentIdFilter());

// close the delta iteration (delta and new workset are identical)
DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);

// emit result
result.writeAsCsv(outputPath, "\n", " ");

// User-defined functions

public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {

    @Override
    public Tuple2<T, T> map(T vertex) {
        return new Tuple2<T, T>(vertex, vertex);
    }
}

public static final class UndirectEdge
                    implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
    Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();

    @Override
    public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
        invertedEdge.f0 = edge.f1;
        invertedEdge.f1 = edge.f0;
        out.collect(edge);
        out.collect(invertedEdge);
    }
}

public static final class NeighborWithComponentIDJoin
                implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {

    @Override
    public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
        return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
    }
}

public static final class ComponentIdFilter
                    implements FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>,
                                            Tuple2<Long, Long>> {

    @Override
    public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value,
                        Collector<Tuple2<Long, Long>> out) {
        if (value.f0.f1 < value.f1.f1) {
            out.collect(value.f0);
        }
    }
}

scala

// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// read vertex and edge data
// assign the initial components (equal to the vertex id)
val vertices = getVerticesDataSet(env).map { id => (id, id) }

// undirected edges by emitting for each input edge the input edges itself and an inverted
// version
val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }

// open a delta iteration
val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) {
  (s, ws) =>

    // apply the step logic: join with the edges
    val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) =>
      (edge._2, vertex._2)
    }

    // select the minimum neighbor
    val minNeighbors = allNeighbors.groupBy(0).min(1)

    // update if the component of the candidate is smaller
    val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) {
      (newVertex, oldVertex, out: Collector[(Long, Long)]) =>
        if (newVertex._2 < oldVertex._2) out.collect(newVertex)
    }

    // delta and new workset are identical
    (updatedComponents, updatedComponents)
}


verticesWithComponents.writeAsCsv(outputPath, "\n", " ")

该连通分支程序实现了上述例子。它需要运行下列参数:–vertices –edges –output –iterations 。
输入文件是纯文本文件,必须格式化如下:

–Vertices 以IDS表示的顶点,由换行字符分隔。例如“1\n2\n12\n42\n63\n”给出了五个订单(1)、(2)、(12)、(42)和(63)。

–Edges 边通过以空格分隔的两个顶点ID表示。不同边是由换行符分隔。例如“1 2\n2 12\n1 12\n42 63\n”表示了四个无方向链接(1)-(2)、(2)-(12)、(1)-(12)和(42)-(63)。

关系型查询

关系型查询示例假定会使用两张表,一张订单表,另一张是TPC-H决策支持基准测试表。TPC-H是数据库行业标准基准测试。如何生成输入数据请参见下面的说明。

该示例实现以下sql查询。
SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
FROM orders, lineitem
WHERE l_orderkey = o_orderkey
AND o_orderstatus = "F"
AND YEAR(o_orderdate) > 1993
AND o_orderpriority LIKE "5%"
GROUP BY l_orderkey, o_shippriority;

Flink程序中按照如下的方式进行sql查询

// get orders data set: (orderkey, orderstatus, orderdate, orderpriority, shippriority)
DataSet<Tuple5<Integer, String, String, String, Integer>> orders = getOrdersDataSet(env);
// get lineitem data set: (orderkey, extendedprice)
DataSet<Tuple2<Integer, Double>> lineitems = getLineitemDataSet(env);

// orders filtered by year: (orderkey, custkey)
DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
        // filter orders
        orders.filter(
            new FilterFunction<Tuple5<Integer, String, String, String, Integer>>() {
                @Override
                public boolean filter(Tuple5<Integer, String, String, String, Integer> t) {
                    // status filter
                    if(!t.f1.equals(STATUS_FILTER)) {
                        return false;
                    // year filter
                    } else if(Integer.parseInt(t.f2.substring(0, 4)) <= YEAR_FILTER) {
                        return false;
                    // order priority filter
                    } else if(!t.f3.startsWith(OPRIO_FILTER)) {
                        return false;
                    }
                    return true;
                }
            })
        // project fields out that are no longer required
        .project(0,4).types(Integer.class, Integer.class);

// join orders with lineitems: (orderkey, shippriority, extendedprice)
DataSet<Tuple3<Integer, Integer, Double>> lineitemsOfOrders =
        ordersFilteredByYear.joinWithHuge(lineitems)
                            .where(0).equalTo(0)
                            .projectFirst(0,1).projectSecond(1)
                            .types(Integer.class, Integer.class, Double.class);

// extendedprice sums: (orderkey, shippriority, sum(extendedprice))
DataSet<Tuple3<Integer, Integer, Double>> priceSums =
        // group by order and sum extendedprice
        lineitemsOfOrders.groupBy(0,1).aggregate(Aggregations.SUM, 2);

// emit result
priceSums.writeAsCsv(outputPath);

缺少scala例子(译者注)

关系查询程序实现了上述查询。它需要以下参数运行–orders –lineitem –output 。
order和lineitem文件可以使用TPC-H基准测试套件的数据生成工具(DBGEN)生成。采取以下步骤生成需提供给flink程序输入的任意大小的数据文件。

1、下载并解压DBGEN

2、复制makefile.suite并更名为Makefile,编辑修改如下:

DATABASE = DB2
MACHINE  = LINUX
WORKLOAD = TPCH
CC       = gcc

1、使用make命令构建DBGEN

2、使用DBGEN生成lineitem和orders表。-s命令传入1,将会一个生成约1 GB的大小的数据集。

./dbgen -T o -s 1


相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
5月前
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
880 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
SQL 存储 算法
Flink Batch SQL Improvements on Lakehouse
阿里云研发工程师刘大龙(风离),在 Streaming Lakehouse Meetup 的分享。
643 1
Flink Batch SQL Improvements on Lakehouse
|
SQL 流计算
Flink CDC这俩statement mode和batch mode啥区别
Flink CDC这俩statement mode和batch mode啥区别
223 1
|
SQL 消息中间件 Kubernetes
Flink官方文档目录索引
前段时间工作比较繁忙,一直都没时间好好的去阅读Flink的文档,本文来整理展开后的Flink文档的所有目录,以便有一个全局的掌控,直接点击上面的目录结构即可查看下详情。
682 1
Flink官方文档目录索引
|
SQL 机器学习/深度学习 分布式计算
会议总结 | 首次 Flink Batch 社区开发者会议
2023 年 2 月 8 日 Flink Batch 社区开发者会议会议总结。
会议总结 | 首次 Flink Batch 社区开发者会议
|
Apache 流计算 开发者
邀请 | Flink Batch 社区开发者会议
Flink Batch 的社区开发者双周会(在线会议),2 月 8 日 (周三)晚上 19:00 到 20:00 邀您参与~
邀请 | Flink Batch 社区开发者会议
|
SQL 资源调度 分布式计算
Flink Batch SQL 1.10 实践
1.10可以说是第一个成熟的生产可用的Flink Batch SQL版本,它一扫之前Dataset的羸弱,从功能和性能上都有大幅改进,以下我从架构、外部系统集成、实践三个方面进行阐述。
|
流计算
实时计算Flink > 独享模式 > Batch(试用) > 创建结果表 —— 创建HDFS结果表
本页目录 创建 HDFS 结果表 DDL定义 WITH参数说明 创建 HDFS 结果表 DDL定义 Flink Batch支持使用HDFS作为结果输出。示例代码如下。
1336 0
|
Java 流计算 对象存储
实时计算Flink > 独享模式 > Batch(试用) > 创建结果表 —— 创建CSV结果表
本页目录 创建CSV结果表 WITH参数 创建CSV结果表 Flink可以将CSV格式文件作为目标表输出。示例如下。 create table csv_source( id varchar, name varchar )...
1580 0
|
存储 流计算 对象存储
实时计算Flink > 独享模式 > Batch(试用) > 创建源表 —— 创建Parquet源表
本页目录 创建Parquet源表 WITH参数 创建Parquet源表 Parquet是一种高性能的列式存储格式。您可以在Parquet的官网对这种存储格式做更详情的了解。 Flink可以读取Parquet格式数据作为输入。
1522 0

热门文章

最新文章