Beam编程系列之Apache Beam WordCount Examples(MinimalWordCount example、WordCount example、Debugging WordCount example、WindowedWordCount example)(官网的推荐步骤)

简介:
https://beam.apache.org/get-started/wordcount-example/

 

 

 

 

  来自官网的:

The WordCount examples demonstrate how to set up a processing pipeline that can read text, tokenize the text lines into individual words, and perform a frequency count on each of those words. The Beam SDKs contain a series of these four successively more detailed WordCount examples that build on each other. The input text for all the examples is a set of Shakespeare’s texts.

Each WordCount example introduces different concepts in the Beam programming model. Begin by understanding Minimal WordCount, the simplest of the examples. Once you feel comfortable with the basic principles in building a pipeline, continue on to learn more concepts in the other examples.

  • Minimal WordCount demonstrates the basic principles involved in building a pipeline.
  • WordCount introduces some of the more common best practices in creating re-usable and maintainable pipelines.
  • Debugging WordCount introduces logging and debugging practices.
  • Windowed WordCount demonstrates how you can use Beam’s programming model to handle both bounded and unbounded datasets.

 

 

 

 

 

 

 

  我这里仅以Minimal WordCount为例。

  首先说明一下,为了简单起见,我直接在代码中显式配置指定PipelineRunner,示例代码片段如下所示:

PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class);

 

  如果要部署到服务器上,可以通过命令行的方式指定PipelineRunner,比如要在Spark集群上运行,类似如下所示命令行:

spark-submit --class org.shirdrn.beam.examples.MinimalWordCountBasedSparkRunner 2017-01-18 --master spark://myserver:7077 target/my-beam-apps-0.0.1-SNAPSHOT-shaded.jar --runner=SparkRunner

 

 

 

 

  下面,我们从几个典型的例子来看(基于Apache Beam软件包的examples有所改动),Apache Beam如何构建Pipeline并运行在指定的PipelineRunner上:

  • WordCount(Count/Source/Sink)

  我们根据Apache Beam的MinimalWordCount示例代码开始,看如何构建一个Pipeline,并最终执行它。 MinimalWordCount的实现,代码如下所示:

复制代码
package org.shirdrn.beam.examples;

import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;

public class MinimalWordCount {

    @SuppressWarnings("serial")
    public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.create();
        options.setRunner(DirectRunner.class); // 显式指定PipelineRunner:DirectRunner(Local模式)

        Pipeline pipeline = Pipeline.create(options);

        pipeline.apply(TextIO.Read.from("/tmp/dataset/apache_beam.txt")) // 读取本地文件,构建第一个PTransform
                .apply("ExtractWords", ParDo.of(new DoFn<String, String>() { // 对文件中每一行进行处理(实际上Split)

                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        for (String word : c.element().split("[\\s:\\,\\.\\-]+")) {
                            if (!word.isEmpty()) {
                                c.output(word);
                            }
                        }
                    }

                }))
                .apply(Count.<String> perElement()) // 统计每一个Word的Count
                .apply("ConcatResultKVs", MapElements.via( // 拼接最后的格式化输出(Key为Word,Value为Count)
                        new SimpleFunction<KV<String, Long>, String>() {

                    @Override
                    public String apply(KV<String, Long> input) {
                        return input.getKey() + ": " + input.getValue();
                    }

                }))
                .apply(TextIO.Write.to("wordcount")); // 输出结果

        pipeline.run().waitUntilFinish();
    }
}
复制代码

 

 

  Pipeline的具体含义,可以看上面代码的注释信息。下面,我们考虑以HDFS数据源作为Source,如何构建第一个PTransform,代码片段如下所示:

PCollection<KV<LongWritable, Text>> resultCollection = pipeline.apply(HDFSFileSource.readFrom(
        "hdfs://myserver:8020/data/ds/beam.txt",
        TextInputFormat.class, LongWritable.class, Text.class))

 

 

  可以看到,返回的是具有键值分别为LongWritable、Text类型的KV对象集合,后续处理和上面处理逻辑类似。如果使用Maven构建Project,需要加上如下依赖(这里beam.version的值可以为最新Release版本0.4.0):

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-hdfs</artifactId>
    <version>${beam.version}</version>
</dependency>

 

 
 
  • 去重(Distinct)

去重也是对数据集比较常见的操作,使用Apache Beam来实现,示例代码如下所示:

复制代码
package org.shirdrn.beam.examples;

import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Distinct;

public class DistinctExample {

    public static void main(String[] args) throws Exception {

         PipelineOptions options = PipelineOptionsFactory.create();
         options.setRunner(DirectRunner.class); // 显式指定PipelineRunner:DirectRunner(Local模式)

         Pipeline pipeline = Pipeline.create(options);
         pipeline.apply(TextIO.Read.from("/tmp/dataset/MY_ID_FILE.txt"))
             .apply(Distinct.<String> create()) // 创建一个处理String类型的PTransform:Distinct
             .apply(TextIO.Write.to("deduped.txt")); // 输出结果
         pipeline.run().waitUntilFinish();
    }
}
复制代码

 

 

 

 

 

 

  • 分组(GroupByKey)

对数据进行分组操作也非常普遍,我们拿一个最基础的PTransform实现GroupByKey来实现一个例子,代码如下所示:

复制代码
package org.shirdrn.beam.examples;

import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Joiner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;

public class GroupByKeyExample {

    @SuppressWarnings("serial")
    public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.create();
        options.setRunner(DirectRunner.class); // 显式指定PipelineRunner:DirectRunner(Local模式)

        Pipeline pipeline = Pipeline.create(options);

        pipeline.apply(TextIO.Read.from("/tmp/dataset/MY_INFO_FILE.txt"))
            .apply("ExtractFields", ParDo.of(new DoFn<String, KV<String, String>>() {

                @ProcessElement
                public void processElement(ProcessContext c) {
                    // file format example: 35451605324179    3G    CMCC
                    String[] values = c.element().split("\t");
                    if(values.length == 3) {
                        c.output(KV.of(values[1], values[0]));
                    }
                }
            }))
            .apply("GroupByKey", GroupByKey.<String, String>create()) // 创建一个GroupByKey实例的PTransform
            .apply("ConcatResults", MapElements.via(
                    new SimpleFunction<KV<String, Iterable<String>>, String>() {

                        @Override
                        public String apply(KV<String, Iterable<String>> input) {
                            return new StringBuffer()
                                    .append(input.getKey()).append("\t")
                                    .append(Joiner.on(",").join(input.getValue()))
                                    .toString();
                        }


            }))
            .apply(TextIO.Write.to("grouppedResults"));

        pipeline.run().waitUntilFinish();

    }
}
复制代码

  使用DirectRunner运行,输出文件名称类似于grouppedResults-00000-of-00002、grouppedResults-00001-of-00002等等。

 

 

 

 

  • 连接(Join)

  最后,我们通过实现一个Join的例子,其中,用户的基本信息包含ID和名称,对应文件格式如下所示:

35451605324179    Jack
35236905298306    Jim
35236905519469    John
35237005022314    Linda

 

 

  另一个文件是用户使用手机的部分信息,文件格式如下所示:

35451605324179    3G    中国移动
35236905298306    2G    中国电信
35236905519469    4G    中国移动

 

 

  我们希望通过Join操作后,能够知道用户使用的什么网络(用户名+网络),使用Apache Beam实现,具体实现代码如下所示:

复制代码
package org.shirdrn.beam.examples;

import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;

public class JoinExample {

    @SuppressWarnings("serial")
    public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.create();
        options.setRunner(DirectRunner.class);  // 显式指定PipelineRunner:DirectRunner(Local模式)

        Pipeline pipeline = Pipeline.create(options);

        // create ID info collection
        final PCollection<KV<String, String>> idInfoCollection = pipeline
                .apply(TextIO.Read.from("/tmp/dataset/MY_ID_INFO_FILE.txt"))
                .apply("CreateUserIdInfoPairs", MapElements.via(
                        new SimpleFunction<String, KV<String, String>>() {

                    @Override
                    public KV<String, String> apply(String input) {
                        // line format example: 35451605324179    Jack
                        String[] values = input.split("\t");
                        return KV.of(values[0], values[1]);
                    }

                }));

        // create operation collection
        final PCollection<KV<String, String>> opCollection = pipeline
                .apply(TextIO.Read.from("/tmp/dataset/MY_ID_OP_INFO_FILE.txt"))
                .apply("CreateIdOperationPairs", MapElements.via(
                        new SimpleFunction<String, KV<String, String>>() {

                    @Override
                    public KV<String, String> apply(String input) {
                        // line format example: 35237005342309    3G    CMCC
                        String[] values = input.split("\t");
                        return KV.of(values[0], values[1]);
                    }

                }));

        final TupleTag<String> idInfoTag = new TupleTag<String>();
        final TupleTag<String> opInfoTag = new TupleTag<String>();

        final PCollection<KV<String, CoGbkResult>> cogrouppedCollection = KeyedPCollectionTuple
                .of(idInfoTag, idInfoCollection)
                .and(opInfoTag, opCollection)
                .apply(CoGroupByKey.<String>create());

        final PCollection<KV<String, String>> finalResultCollection = cogrouppedCollection
                .apply("CreateJoinedIdInfoPairs", ParDo.of(new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {

                @ProcessElement
                public void processElement(ProcessContext c) {
                    KV<String, CoGbkResult> e = c.element();
                    String id = e.getKey();
                    String name = e.getValue().getOnly(idInfoTag);
                    for (String opInfo : c.element().getValue().getAll(opInfoTag)) {
                      // Generate a string that combines information from both collection values
                      c.output(KV.of(id, "\t" + name + "\t" + opInfo));
                    }
                }
        }));

        PCollection<String> formattedResults = finalResultCollection
                .apply("FormatFinalResults", ParDo.of(new DoFn<KV<String, String>, String>() {
                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    c.output(c.element().getKey() + "\t" + c.element().getValue());
                  }
                }));

         formattedResults.apply(TextIO.Write.to("joinedResults"));
         pipeline.run().waitUntilFinish();

    }



相关文章
|
5天前
|
API Windows
解决echarts.apache.org官网不能访问的问题(主要是用于查看配置项API参数细节说明)
解决echarts.apache.org官网不能访问的问题(主要是用于查看配置项API参数细节说明)
|
11月前
|
网络协议 Apache
“Apache官网打不开”怎么办?
“Apache官网打不开”怎么办?
|
分布式计算 大数据 Apache
Apache Beam WordCount编程实战及源码解读
概述:Apache Beam WordCount编程实战及源码解读,并通过intellij IDEA和terminal两种方式调试运行WordCount程序,Apache Beam对大数据的批处理和流处理,提供一套先进的统一的编程模型,并可以运行大数据处理引擎上。完整项目Github源码 负责公司大数据处理相关架构,但是具有多样性,极大的增加了开发成本,急需统一编程处理
3522 0
|
5天前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
722 5
|
1天前
|
Oracle 关系型数据库 数据库
实时计算 Flink版操作报错合集之执行Flink job,报错“Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing”,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
13 0
|
2天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到报错:Apache Kafka Connect错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
14 5
|
2天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之报错:org.apache.flink.table.api.validationexception如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
12 1
|
2天前
|
存储 SQL 关系型数据库
实时计算 Flink版操作报错合集之报错:WARN (org.apache.kafka.clients.consumer.ConsumerConfig:logUnused)这个错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
13 3
|
5天前
|
消息中间件 API Apache
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
本文整理自阿里云开源大数据平台徐榜江 (雪尽),关于阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会。
1810 2
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
|
5天前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1797 2
官宣|Apache Flink 1.19 发布公告

热门文章

最新文章

推荐镜像

更多