[Flink]Flink1.3 Batch指南一 本地运行

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink可以在单台机器上运行,甚至可以在单个Java虚拟机中运行。 这运行机制可以方便用户在本地测试和调试Flink程序。

Flink可以在单台机器上运行,甚至可以在单个Java虚拟机中运行。 这运行机制可以方便用户在本地测试和调试Flink程序。本节概述了Flink的本地执行机制。

本地环境和执行器(executors)允许你可以在本地Java虚拟机上运行Flink程序,或者是在正在运行程序的Java虚拟机上(with within any JVM as part of existing programs)。对于大部分示例程序而言,你只需简单的地点击你IDE上的运行(Run)按钮就可以执行。

Flink支持两种不同的本地运行机制: (1) LocalExecutionEnvironment启动完整的Flink运行环境,包括一个JobManager和一个TaskManager。这些包含了内存管理以及在集群模式下运行时所运行的所有内部算法。 (2) CollectionEnvironment在Java集合上运行Flink程序(executing the Flink program on Java collections)。这种模式不会启动完整的Flink运行环境,因此运行开销比较低以及轻量级。例如,DataSet的map转换操作将map()函数应用于Java列表中的所有元素上。

1. 调试

如果你在本地运行Flink程序,还可以像任何其他Java程序一样来调试程序。你可以使用System.out.println()来打印一些内部变量,也可以使用调试器。可以在map(),reduce()以及所有其他方法中设置断点。请参阅Java API文档中的调试部分,来了解如何使用Java API来测试和本地调试程序。

2. Maven

如果你在Maven项目中开发程序,则必须使用下面依赖关系添加flink-clients模块:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients_2.10</artifactId>
  <version>1.3.2</version>
</dependency>

3. 本地运行环境

LocalEnvironment是本地运行Flink程序的句柄,可以使用它在本地的JVM,独立运行或嵌入其他程序里运行。

本地运行执行环境通过ExecutionEnvironment.createLocalEnvironment()方法实例化。默认情况下,Flink将尽可能使用跟你机器CPU核数一样多的本地线程来执行程序。你可以指定程序你想要的并行度。本地运行环境可以通过enableLogging()/disableLogging()来配置日志的输出。

在大多数情况下,ExecutionEnvironment.getExecutionEnvironment()是一种更好的选择。当程序在本地启动时(不使用命令行接口),该方法返回LocalEnvironment,当程序是通过命令行接口提交时,则该方法会返回为在集群中运行提前配置好的运行环境(pre-configured environment for cluster execution)。

public static void main(String[] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();

    DataSet<String> data = env.readTextFile("file:///path/to/file");

    data.filter(new FilterFunction<String>() {
            public boolean filter(String value) {
                return value.startsWith("http://");
            }
        })
        .writeAsText("file:///path/to/result");

    JobExecutionResult res = env.execute();
}

在程序执行结束时会返回JobExecutionResult对象,这个类中包含了程序的运行状态(runtime)和累加器(accumulator)结果。

LocalEnvironment也可以向Flink传入用户自定义配置。

Configuration conf = new Configuration();
conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.5f);
final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);

备注:

本地运行环境不启动任何Web前端来监控运行。

4. 集合运行环境

使用CollectionEnvironment在Java集合上运行,对于运行Flink程序是一种开销比较低的方法。在这种模式中通常用于自动化测试、调试、代码重用等场景。

用户可以使用用于批处理的算法,或者是用于更具交互性的算法(Users can use algorithms implemented for batch processing also for cases that are more interactive)。Flink程序通过稍微修改就可用于处理请求的Java应用服务器。

下面是集合环境的例子:

public static void main(String[] args) throws Exception {
    // initialize a new Collection-based execution environment
    final ExecutionEnvironment env = new CollectionEnvironment();

    DataSet<User> users = env.fromCollection( /* get elements from a Java Collection */);

    /* Data Set transformations ... */

    // retrieve the resulting Tuple2 elements into a ArrayList.
    Collection<...> result = new ArrayList<...>();
    resultDataSet.output(new LocalCollectionOutputFormat<...>(result));

    // kick off execution.
    env.execute();

    // Do some work with the resulting ArrayList (=Collection).
    for(... t : result) {
        System.err.println("Result = "+t);
    }
}

flink-examples-batch模块包含一个完整的示例,名称为CollectionExecutionExample

备注:

基于集合的Flink程序仅适用于小数据量,这样可以完全放进JVM堆中。在集合上的运行不是多线程的,只使用一个线程。

备注:

Flink版本为1.3

原文:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/local_execution.html

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
7月前
|
Oracle Java 关系型数据库
实时计算 Flink版操作报错合集之本地打成jar包,运行报错,idea运行不报错,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
105 6
|
2月前
|
消息中间件 监控 Java
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
80 1
|
4月前
|
存储 缓存 Java
实时计算 Flink版操作报错合集之怎么处理在运行作业时遇到报错::ClassCastException
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
消息中间件 监控 关系型数据库
实时计算 Flink版产品使用问题之运行后,怎么进行监控和报警
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之运行mysql to doris pipeline时报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
SQL Kubernetes 流计算
实时计算 Flink版产品使用问题之如何在Windows上运行
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL API 数据处理
实时计算 Flink版产品使用问题之如何避免集群重启后job信息和运行状态丢失
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版产品使用问题之直接killyarn-session集群导致正在运行的任务失败,该如何恢复
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之运行run-application --target kubernetes-application执行,通过进程的返回码来决定作业是否成功,任务返回码都是0,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
机器学习/深度学习 人工智能 Apache
人工智能平台PAI操作报错合集之alink任务可以在本地运行,上传到flink web运行就报错,如何解决
阿里云人工智能平台PAI (Platform for Artificial Intelligence) 是阿里云推出的一套全面、易用的机器学习和深度学习平台,旨在帮助企业、开发者和数据科学家快速构建、训练、部署和管理人工智能模型。在使用阿里云人工智能平台PAI进行操作时,可能会遇到各种类型的错误。以下列举了一些常见的报错情况及其可能的原因和解决方法。