[Flink]Flink1.3 Batch指南一 本地运行

简介: Flink可以在单台机器上运行,甚至可以在单个Java虚拟机中运行。 这运行机制可以方便用户在本地测试和调试Flink程序。

Flink可以在单台机器上运行,甚至可以在单个Java虚拟机中运行。 这运行机制可以方便用户在本地测试和调试Flink程序。本节概述了Flink的本地执行机制。

本地环境和执行器(executors)允许你可以在本地Java虚拟机上运行Flink程序,或者是在正在运行程序的Java虚拟机上(with within any JVM as part of existing programs)。对于大部分示例程序而言,你只需简单的地点击你IDE上的运行(Run)按钮就可以执行。

Flink支持两种不同的本地运行机制: (1) LocalExecutionEnvironment启动完整的Flink运行环境,包括一个JobManager和一个TaskManager。这些包含了内存管理以及在集群模式下运行时所运行的所有内部算法。 (2) CollectionEnvironment在Java集合上运行Flink程序(executing the Flink program on Java collections)。这种模式不会启动完整的Flink运行环境,因此运行开销比较低以及轻量级。例如,DataSet的map转换操作将map()函数应用于Java列表中的所有元素上。

1. 调试

如果你在本地运行Flink程序,还可以像任何其他Java程序一样来调试程序。你可以使用System.out.println()来打印一些内部变量,也可以使用调试器。可以在map(),reduce()以及所有其他方法中设置断点。请参阅Java API文档中的调试部分,来了解如何使用Java API来测试和本地调试程序。

2. Maven

如果你在Maven项目中开发程序,则必须使用下面依赖关系添加flink-clients模块:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients_2.10</artifactId>
  <version>1.3.2</version>
</dependency>

3. 本地运行环境

LocalEnvironment是本地运行Flink程序的句柄,可以使用它在本地的JVM,独立运行或嵌入其他程序里运行。

本地运行执行环境通过ExecutionEnvironment.createLocalEnvironment()方法实例化。默认情况下,Flink将尽可能使用跟你机器CPU核数一样多的本地线程来执行程序。你可以指定程序你想要的并行度。本地运行环境可以通过enableLogging()/disableLogging()来配置日志的输出。

在大多数情况下,ExecutionEnvironment.getExecutionEnvironment()是一种更好的选择。当程序在本地启动时(不使用命令行接口),该方法返回LocalEnvironment,当程序是通过命令行接口提交时,则该方法会返回为在集群中运行提前配置好的运行环境(pre-configured environment for cluster execution)。

public static void main(String[] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();

    DataSet<String> data = env.readTextFile("file:///path/to/file");

    data.filter(new FilterFunction<String>() {
            public boolean filter(String value) {
                return value.startsWith("http://");
            }
        })
        .writeAsText("file:///path/to/result");

    JobExecutionResult res = env.execute();
}

在程序执行结束时会返回JobExecutionResult对象,这个类中包含了程序的运行状态(runtime)和累加器(accumulator)结果。

LocalEnvironment也可以向Flink传入用户自定义配置。

Configuration conf = new Configuration();
conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.5f);
final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);

备注:

本地运行环境不启动任何Web前端来监控运行。

4. 集合运行环境

使用CollectionEnvironment在Java集合上运行,对于运行Flink程序是一种开销比较低的方法。在这种模式中通常用于自动化测试、调试、代码重用等场景。

用户可以使用用于批处理的算法,或者是用于更具交互性的算法(Users can use algorithms implemented for batch processing also for cases that are more interactive)。Flink程序通过稍微修改就可用于处理请求的Java应用服务器。

下面是集合环境的例子:

public static void main(String[] args) throws Exception {
    // initialize a new Collection-based execution environment
    final ExecutionEnvironment env = new CollectionEnvironment();

    DataSet<User> users = env.fromCollection( /* get elements from a Java Collection */);

    /* Data Set transformations ... */

    // retrieve the resulting Tuple2 elements into a ArrayList.
    Collection<...> result = new ArrayList<...>();
    resultDataSet.output(new LocalCollectionOutputFormat<...>(result));

    // kick off execution.
    env.execute();

    // Do some work with the resulting ArrayList (=Collection).
    for(... t : result) {
        System.err.println("Result = "+t);
    }
}

flink-examples-batch模块包含一个完整的示例,名称为CollectionExecutionExample

备注:

基于集合的Flink程序仅适用于小数据量,这样可以完全放进JVM堆中。在集合上的运行不是多线程的,只使用一个线程。

备注:

Flink版本为1.3

原文:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/local_execution.html

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
11月前
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
1779 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
消息中间件 监控 Java
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
328 1
|
存储 缓存 Java
实时计算 Flink版操作报错合集之怎么处理在运行作业时遇到报错::ClassCastException
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之运行mysql to doris pipeline时报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
SQL Kubernetes 流计算
实时计算 Flink版产品使用问题之如何在Windows上运行
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
消息中间件 监控 关系型数据库
实时计算 Flink版产品使用问题之运行后,怎么进行监控和报警
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之运行run-application --target kubernetes-application执行,通过进程的返回码来决定作业是否成功,任务返回码都是0,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
机器学习/深度学习 人工智能 Apache
人工智能平台PAI操作报错合集之alink任务可以在本地运行,上传到flink web运行就报错,如何解决
阿里云人工智能平台PAI (Platform for Artificial Intelligence) 是阿里云推出的一套全面、易用的机器学习和深度学习平台,旨在帮助企业、开发者和数据科学家快速构建、训练、部署和管理人工智能模型。在使用阿里云人工智能平台PAI进行操作时,可能会遇到各种类型的错误。以下列举了一些常见的报错情况及其可能的原因和解决方法。
|
缓存 Kubernetes Java
实时计算 Flink版产品使用合集之nk任务在k8s上运行,数据量大时经常失败,并且某个TaskManager被cgroup杀掉,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
SQL API 数据处理
实时计算 Flink版产品使用问题之如何避免集群重启后job信息和运行状态丢失
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章