《Flink官方文档》Batch Examples(二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 连通分支 连通分支算法识别会一个更大的图,这部分图通过被相同的组件ID链接的所有顶点连接。类似PageRank,连通组件是一个迭代算法。在每个步骤中,每个顶点都将其当前组件ID传给所有邻居。如果小于自己的组件ID,一个顶点从邻居接受组件ID。

连通分支

连通分支算法识别会一个更大的图,这部分图通过被相同的组件ID链接的所有顶点连接。类似PageRank,连通组件是一个迭代算法。在每个步骤中,每个顶点都将其当前组件ID传给所有邻居。如果小于自己的组件ID,一个顶点从邻居接受组件ID。

此实现使用增量迭代:组件ID未变化的顶点不参与下一步骤。因为后来的迭代通常只处理一些离群顶点,这将产生更好的性能。

// read vertex and edge data
DataSet<Long> vertices = getVertexDataSet(env);
DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge());

// assign the initial component IDs (equal to the vertex ID)
DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());

// open a delta iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
        verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0);

// apply the step logic:
DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset()
        // join with the edges
        .join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
        // select the minimum neighbor component ID
        .groupBy(0).aggregate(Aggregations.MIN, 1)
        // update if the component ID of the candidate is smaller
        .join(iteration.getSolutionSet()).where(0).equalTo(0)
        .flatMap(new ComponentIdFilter());

// close the delta iteration (delta and new workset are identical)
DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);

// emit result
result.writeAsCsv(outputPath, "\n", " ");

// User-defined functions

public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {

    @Override
    public Tuple2<T, T> map(T vertex) {
        return new Tuple2<T, T>(vertex, vertex);
    }
}

public static final class UndirectEdge
                    implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
    Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();

    @Override
    public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
        invertedEdge.f0 = edge.f1;
        invertedEdge.f1 = edge.f0;
        out.collect(edge);
        out.collect(invertedEdge);
    }
}

public static final class NeighborWithComponentIDJoin
                implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {

    @Override
    public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
        return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
    }
}

public static final class ComponentIdFilter
                    implements FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>,
                                            Tuple2<Long, Long>> {

    @Override
    public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value,
                        Collector<Tuple2<Long, Long>> out) {
        if (value.f0.f1 < value.f1.f1) {
            out.collect(value.f0);
        }
    }
}

scala

// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// read vertex and edge data
// assign the initial components (equal to the vertex id)
val vertices = getVerticesDataSet(env).map { id => (id, id) }

// undirected edges by emitting for each input edge the input edges itself and an inverted
// version
val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }

// open a delta iteration
val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) {
  (s, ws) =>

    // apply the step logic: join with the edges
    val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) =>
      (edge._2, vertex._2)
    }

    // select the minimum neighbor
    val minNeighbors = allNeighbors.groupBy(0).min(1)

    // update if the component of the candidate is smaller
    val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) {
      (newVertex, oldVertex, out: Collector[(Long, Long)]) =>
        if (newVertex._2 < oldVertex._2) out.collect(newVertex)
    }

    // delta and new workset are identical
    (updatedComponents, updatedComponents)
}


verticesWithComponents.writeAsCsv(outputPath, "\n", " ")

该连通分支程序实现了上述例子。它需要运行下列参数:–vertices –edges –output –iterations 。
输入文件是纯文本文件,必须格式化如下:

–Vertices 以IDS表示的顶点,由换行字符分隔。例如“1\n2\n12\n42\n63\n”给出了五个订单(1)、(2)、(12)、(42)和(63)。

–Edges 边通过以空格分隔的两个顶点ID表示。不同边是由换行符分隔。例如“1 2\n2 12\n1 12\n42 63\n”表示了四个无方向链接(1)-(2)、(2)-(12)、(1)-(12)和(42)-(63)。

关系型查询

关系型查询示例假定会使用两张表,一张订单表,另一张是TPC-H决策支持基准测试表。TPC-H是数据库行业标准基准测试。如何生成输入数据请参见下面的说明。

该示例实现以下sql查询。
SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
FROM orders, lineitem
WHERE l_orderkey = o_orderkey
AND o_orderstatus = "F"
AND YEAR(o_orderdate) > 1993
AND o_orderpriority LIKE "5%"
GROUP BY l_orderkey, o_shippriority;

Flink程序中按照如下的方式进行sql查询

// get orders data set: (orderkey, orderstatus, orderdate, orderpriority, shippriority)
DataSet<Tuple5<Integer, String, String, String, Integer>> orders = getOrdersDataSet(env);
// get lineitem data set: (orderkey, extendedprice)
DataSet<Tuple2<Integer, Double>> lineitems = getLineitemDataSet(env);

// orders filtered by year: (orderkey, custkey)
DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
        // filter orders
        orders.filter(
            new FilterFunction<Tuple5<Integer, String, String, String, Integer>>() {
                @Override
                public boolean filter(Tuple5<Integer, String, String, String, Integer> t) {
                    // status filter
                    if(!t.f1.equals(STATUS_FILTER)) {
                        return false;
                    // year filter
                    } else if(Integer.parseInt(t.f2.substring(0, 4)) <= YEAR_FILTER) {
                        return false;
                    // order priority filter
                    } else if(!t.f3.startsWith(OPRIO_FILTER)) {
                        return false;
                    }
                    return true;
                }
            })
        // project fields out that are no longer required
        .project(0,4).types(Integer.class, Integer.class);

// join orders with lineitems: (orderkey, shippriority, extendedprice)
DataSet<Tuple3<Integer, Integer, Double>> lineitemsOfOrders =
        ordersFilteredByYear.joinWithHuge(lineitems)
                            .where(0).equalTo(0)
                            .projectFirst(0,1).projectSecond(1)
                            .types(Integer.class, Integer.class, Double.class);

// extendedprice sums: (orderkey, shippriority, sum(extendedprice))
DataSet<Tuple3<Integer, Integer, Double>> priceSums =
        // group by order and sum extendedprice
        lineitemsOfOrders.groupBy(0,1).aggregate(Aggregations.SUM, 2);

// emit result
priceSums.writeAsCsv(outputPath);

缺少scala例子(译者注)

关系查询程序实现了上述查询。它需要以下参数运行–orders –lineitem –output 。
order和lineitem文件可以使用TPC-H基准测试套件的数据生成工具(DBGEN)生成。采取以下步骤生成需提供给flink程序输入的任意大小的数据文件。

1、下载并解压DBGEN

2、复制makefile.suite并更名为Makefile,编辑修改如下:

DATABASE = DB2
MACHINE  = LINUX
WORKLOAD = TPCH
CC       = gcc

1、使用make命令构建DBGEN

2、使用DBGEN生成lineitem和orders表。-s命令传入1,将会一个生成约1 GB的大小的数据集。

./dbgen -T o -s 1


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
SQL 存储 算法
Flink Batch SQL Improvements on Lakehouse
阿里云研发工程师刘大龙(风离),在 Streaming Lakehouse Meetup 的分享。
545 1
Flink Batch SQL Improvements on Lakehouse
|
SQL 流计算
Flink CDC这俩statement mode和batch mode啥区别
Flink CDC这俩statement mode和batch mode啥区别
125 1
|
SQL 机器学习/深度学习 分布式计算
会议总结 | 首次 Flink Batch 社区开发者会议
2023 年 2 月 8 日 Flink Batch 社区开发者会议会议总结。
会议总结 | 首次 Flink Batch 社区开发者会议
|
Apache 流计算 开发者
邀请 | Flink Batch 社区开发者会议
Flink Batch 的社区开发者双周会(在线会议),2 月 8 日 (周三)晚上 19:00 到 20:00 邀您参与~
邀请 | Flink Batch 社区开发者会议
|
SQL 消息中间件 Kubernetes
Flink官方文档目录索引
前段时间工作比较繁忙,一直都没时间好好的去阅读Flink的文档,本文来整理展开后的Flink文档的所有目录,以便有一个全局的掌控,直接点击上面的目录结构即可查看下详情。
523 1
Flink官方文档目录索引
|
SQL 资源调度 分布式计算
Flink Batch SQL 1.10 实践
1.10可以说是第一个成熟的生产可用的Flink Batch SQL版本,它一扫之前Dataset的羸弱,从功能和性能上都有大幅改进,以下我从架构、外部系统集成、实践三个方面进行阐述。
|
流计算
实时计算Flink > 独享模式 > Batch(试用) > 创建结果表 —— 创建HDFS结果表
本页目录 创建 HDFS 结果表 DDL定义 WITH参数说明 创建 HDFS 结果表 DDL定义 Flink Batch支持使用HDFS作为结果输出。示例代码如下。
1271 0
|
Java 流计算 对象存储
实时计算Flink > 独享模式 > Batch(试用) > 创建结果表 —— 创建CSV结果表
本页目录 创建CSV结果表 WITH参数 创建CSV结果表 Flink可以将CSV格式文件作为目标表输出。示例如下。 create table csv_source( id varchar, name varchar )...
1522 0
|
存储 流计算 对象存储
实时计算Flink > 独享模式 > Batch(试用) > 创建源表 —— 创建Parquet源表
本页目录 创建Parquet源表 WITH参数 创建Parquet源表 Parquet是一种高性能的列式存储格式。您可以在Parquet的官网对这种存储格式做更详情的了解。 Flink可以读取Parquet格式数据作为输入。
1453 0
|
对象存储 流计算 存储
实时计算Flink &gt; 独享模式 &gt; Batch(试用) &gt; 创建源表 —— 创建OSS源表
本页目录 创建OSS源表 OSS With参数 创建OSS源表 OSS 阿里云对象存储服务(Object Storage Service),简称OSS。为您提供基于网络的数据存取服务。
1762 0