Apache Beam WordCount编程实战及源码解读

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 概述:Apache Beam WordCount编程实战及源码解读,并通过intellij IDEA和terminal两种方式调试运行WordCount程序,Apache Beam对大数据的批处理和流处理,提供一套先进的统一的编程模型,并可以运行大数据处理引擎上。完整项目Github源码负责公司大数据处理相关架构,但是具有多样性,极大的增加了开发成本,急需统一编程处理

概述:Apache Beam WordCount编程实战及源码解读,并通过intellij IDEA和terminal两种方式调试运行WordCount程序,Apache Beam对大数据的批处理和流处理,提供一套先进的统一的编程模型,并可以运行大数据处理引擎上。完整项目Github源码

Apache Beam WordCount编程实战及源码解读

负责公司大数据处理相关架构,但是具有多样性,极大的增加了开发成本,急需统一编程处理,Apache Beam,一处编程,处处运行,故将折腾成果分享出来。

1.Apache Beam编程实战–前言,Apache Beam的特点与关键概念。

Apache Beam 于2017年1月10日成为Apache新的顶级项目。

1.1.Apache Beam 特点:

  • 统一:对于批处理和流媒体用例使用单个编程模型。
  • 方便:支持多个pipelines环境运行,包括:Apache Apex, Apache Flink, Apache Spark, 和 Google Cloud Dataflow。
  • 可扩展:编写和分享新的SDKs,IO连接器和transformation库
    部分翻译摘自官网:Apacher Beam 官网

1.2.Apache Beam关键概念:

1.2.1.Apache Beam SDKs

主要是开发API,为批处理和流处理提供统一的编程模型。目前(2017)支持JAVA语言,而Python正在紧张开发中。

1.2.2. Apache Beam Pipeline Runners(Beam的执行器/执行者们),支持Apache Apex,Apache Flink,Apache Spark,Google Cloud Dataflow多个大数据计算框架。可谓是一处Apache Beam编程,多计算框架运行。

1.2.3. 他们的对如下的支持情况详见

Apache Beam WordCount编程实战及源码解读

2.Apache Beam编程实战–Apache Beam源码解读

基于maven,intellij IDEA,pom.xm查看 完整项目Github源码 。直接通过IDEA的项目导入功能即可导入完整项目,等待MAVEN下载依赖包,然后按照如下解读步骤即可顺利运行。

2.1.源码解析-Apache Beam 数据流处理原理解析:

关键步骤:

  • 创建Pipeline
  • 将转换应用于Pipeline
  • 读取输入文件
  • 应用ParDo转换
  • 应用SDK提供的转换(例如:Count)
  • 写出输出
  • 运行Pipeline

Apache Beam WordCount编程实战及源码解读

2.2.源码解析,完整项目Github源码,附WordCount,pom.xml等

/**
 * MIT.
 * Author: wangxiaolei(王小雷).
 * Date:17-2-20.
 * Project:ApacheBeamWordCount.
 */


import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;


public class WordCount {

    /**
     *1.a.通过Dofn编程Pipeline使得代码很简洁。b.对输入的文本做单词划分,输出。
     */
    static class ExtractWordsFn extends DoFn<String, String> {
        private final Aggregator<Long, Long> emptyLines =
                createAggregator("emptyLines", Sum.ofLongs());

        @ProcessElement
        public void processElement(ProcessContext c) {
            if (c.element().trim().isEmpty()) {
                emptyLines.addValue(1L);
            }

            // 将文本行划分为单词
            String[] words = c.element().split("[^a-zA-Z']+");
            // 输出PCollection中的单词
            for (String word : words) {
                if (!word.isEmpty()) {
                    c.output(word);
                }
            }
        }
    }

    /**
     *2.格式化输入的文本数据,将转换单词为并计数的打印字符串。
     */
    public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
        @Override
        public String apply(KV<String, Long> input) {
            return input.getKey() + ": " + input.getValue();
        }
    }
    /**
     *3.单词计数,PTransform(PCollection Transform)将PCollection的文本行转换成格式化的可计数单词。
     */
    public static class CountWords extends PTransform<PCollection<String>,
            PCollection<KV<String, Long>>> {
        @Override
        public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

            // 将文本行转换成单个单词
            PCollection<String> words = lines.apply(
                    ParDo.of(new ExtractWordsFn()));

            // 计算每个单词次数
            PCollection<KV<String, Long>> wordCounts =
                    words.apply(Count.<String>perElement());

            return wordCounts;
        }
    }

    /**
     *4.可以自定义一些选项(Options),比如文件输入输出路径
     */
    public interface WordCountOptions extends PipelineOptions {

        /**
         * 文件输入选项,可以通过命令行传入路径参数,路径默认为gs://apache-beam-samples/shakespeare/kinglear.txt
         */
        @Description("Path of the file to read from")
        @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
        String getInputFile();
        void setInputFile(String value);

        /**
         * 设置结果文件输出路径,在intellij IDEA的运行设置选项中或者在命令行中指定输出文件路径,如./pom.xml
         */
        @Description("Path of the file to write to")
        @Required
        String getOutput();
        void setOutput(String value);
    }
    /**
     * 5.运行程序
     */
    public static void main(String[] args) {
        WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(WordCountOptions.class);
        Pipeline p = Pipeline.create(options);

        p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
                .apply(new CountWords())
                .apply(MapElements.via(new FormatAsTextFn()))
                .apply("WriteCounts", TextIO.Write.to(options.getOutput()));

        p.run().waitUntilFinish();
    }
}

3.支持Spark,Flink,Apex等大数据数据框架来运行该WordCount程序。完整项目Github源码(推荐,注意pom.xml模块加载是否成功,在工具中开发大数据程序,利于调试,开发体验较好)

3.1.intellij IDEA(社区版)中Spark大数据框架运行Pipeline计算程序

  • Spark运行

    • 设置VM options

      -DPapex-runner
    • 设置Programe arguments

      --inputFile=pom.xml --output=counts

Apache Beam WordCount编程实战及源码解读

3.2.intellij IDEA(社区版)中Apex,Flink等支持的大数据框架均可运行WordCount的Pipeline计算程序,完整项目Github源码

  • Apex运行

    • 设置VM options

      -DPapex-runner
    • 设置Programe arguments

      --inputFile=pom.xml --output=counts
  • Flink运行等等

    • 设置VM options

      -DPflink-runner
    • 设置Programe arguments

      --inputFile=pom.xml --output=counts

4.终端运行(Terminal)(不推荐,第一次下载过程很慢,开发体验较差)

4.1.以下命令是下载官方示例源码,第一次运行下载较慢,如果失败了就多运行几次,(推荐下载,完整项目Github源码)直接用上述解读在intellij IDEA中运行。

mvn archetype:generate       -DarchetypeRepository=https://repository.apache.org/content/groups/snapshots       -DarchetypeGroupId=org.apache.beam       -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples       -DarchetypeVersion=LATEST       -DgroupId=org.example       -DartifactId=word-count-beam       -Dversion="0.1"       -Dpackage=org.apache.beam.examples       -DinteractiveMode=false

Apache Beam WordCount编程实战及源码解读

4.2.打包并运行

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount      -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner

Apache Beam WordCount编程实战及源码解读

4.3.成功运行结果

4.3.1.显示运行成功

Apache Beam WordCount编程实战及源码解读

4.3.2.WordCount输出计算结果

这里写图片描述

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
8月前
|
域名解析 Linux Apache
Linux Apache服务详解——虚拟网站主机功能实战
Linux Apache服务详解——虚拟网站主机功能实战
193 5
|
8月前
|
运维 Linux Apache
Linux Apache服务详解——Apache虚拟目录与禁止显示目录列表实战
Linux Apache服务详解——Apache虚拟目录与禁止显示目录列表实战
117 2
|
2月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
102 5
|
3月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
57 3
|
4月前
|
前端开发 JavaScript Java
Apache Wicket 框架:踏上从新手到英雄的逆袭之路,成就你的编程传奇!
【9月更文挑战第4天】Apache Wicket是一款基于Java的开源Web应用框架,以简洁、易维护及强大功能著称。它采用组件化设计,让页面开发更为模块化。Wicket的简洁编程模型、丰富的组件库、良好的可维护性以及对Ajax的支持,使其成为高效开发Web应用的理想选择。下文将通过解析Wicket的基本概念与特性,帮助读者深入了解这一框架的优势。
197 1
|
4月前
|
Java API Apache
从零到英雄的蜕变:如何用Apache Wicket打造你的第一个Web应用——不仅是教程,更是编程之旅的启航
【9月更文挑战第4天】学习Apache Wicket这一开源Java Web应用框架是一段激动人心的旅程。本文将指导你通过Maven搭建环境,并创建首个“Hello, World!”应用。从配置`pom.xml`到实现`HelloWorldApplication`类,再到`web.xml`的设置,一步步教你构建与部署简单网页。适合初学者快速上手,体验其简洁API与强大组件化设计的魅力。
101 1
|
5月前
|
关系型数据库 Linux 网络安全
"Linux系统实战:从零开始部署Apache+PHP Web项目,轻松搭建您的在线应用"
【8月更文挑战第9天】Linux作为服务器操作系统,凭借其稳定性和安全性成为部署Web项目的优选平台。本文以Apache Web服务器和PHP项目为例,介绍部署流程。首先,通过包管理器安装Apache与PHP;接着创建项目目录,并上传项目文件至该目录;根据需要配置Apache虚拟主机;最后重启Apache服务并测试项目。确保防火墙允许HTTP流量,正确配置数据库连接,并定期更新系统以维持安全。随着项目复杂度提升,进一步学习高级配置将变得必要。
409 0
|
7月前
|
存储 Apache 文件存储
在Apache环境下为Web网站增设访问控制:实战指南
在Apache服务器上保护网站资源涉及启用访问控制模块(`mod_authz_core`和`mod_auth_basic`),在`.htaccess`或`httpd.conf`中设定权限,如限制对特定目录的访问。创建`.htpasswd`文件存储用户名和密码,并使用`htpasswd`工具管理用户。完成配置后重启Apache服务,访问受限目录时需提供有效的用户名和密码。对于高安全性需求,可考虑更复杂的认证方法。【6月更文挑战第20天】
440 4
|
7月前
|
弹性计算 应用服务中间件 Linux
双剑合璧:在同一ECS服务器上共存Apache与Nginx的实战攻略
在ECS服务器上同时部署Apache和Nginx的实战:安装更新系统,Ubuntu用`sudo apt install apache2 nginx`,CentOS用`sudo yum install httpd nginx`。配置Nginx作为反向代理,处理静态内容及转发动态请求到Apache(监听8080端口)。调整Apache的`ports.conf`监听8080。重启服务测试,实现两者高效协同,提升Web服务性能。记得根据流量和需求优化配置。【6月更文挑战第21天】
625 1
|
6月前
|
分布式计算 Apache Spark

推荐镜像

更多