A Complete Example

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: A Complete Example这个例子将关于人员的记录流作为输入,并将其过滤为只包含成人。import org.apache.flink.streaming.api.environment.

A Complete Example

这个例子将关于人员的记录流作为输入,并将其过滤为只包含成人。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;

public class Example {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Person> flintstones = env.fromElements(
            new Person("Fred", 35),
            new Person("Wilma", 35),
            new Person("Pebbles", 2)
        );

        DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
            @Override
            public boolean filter(Person person) throws Exception {
                return person.age >= 18;
            }
        });

        adults.print();

        env.execute();
    }

    public static class Person {
        public String name;
        public Integer age;
        public Person() {};

        public Person(String name, Integer age) {
            this.name = name;
            this.age = age;
        };

        public String toString() {
            return this.name.toString() + ": age" + this.age.toSrting();
        };
    }
}

流执行环境

每个Flink应用程序都需要一个执行环境,在这个例子中是 env。流式应用需要使用 StreamExecutionEnvironment

在你的应用程序中DataStream API的调用会建立一个关联到StreamExecutionEnvironment的作业图。当env.execute()被调用这个作业图就会被打包并发送给 Job Manager(作业管理器),作业管理器将作业并行化并将其片段分发给Task Manager(任务管理器)用于执行。每个作业的并行切片将会在task slot(任务槽)中执行。

需要注意的是,如果你不调用 execute()你的应用就不会跑。

processes

此分布式运行时取决于您的应用程序是否可序列化。它还要求集群中的每个节点都可以使用所有依赖项。

基本流源

在上面的例子中我们通过env.fromElements(...)构建了一个DataStream<Person>。这是将简单流集合在一起以便在原型或测试中使用的便捷方式。在StreamExecutionEnvironment上同样有一个方法fromCollection(Collection)。我们可以这样实现:

List<Person> people = new ArrayList<Person>();

people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));

DataStream<Person> flintstones = env.fromCollection(people);

在原型设计时将一些数据放入流中的另一种简单方式是使用套接字

DataStream<String> lines = env.socketTextStream("localhost", 9999)

或文件

DataStream<String> lines = env.readTextFile("file:///path")

在实际应用中,最常用的数据源是那些支持低延迟,高吞吐量并行度去以及倒带和重放的数据源 - 高性能和容错的先决条件 - 例如Apache Kafka, Kinesis 以及各种文件系统。REST APIs和数据库也经常用于丰富流。

基本流下沉

上例使用adults.print()来显示结果到任务管理器的日志中(如果运行在IDE上则会出现在IDE的控制台中)。这个方法会为流中的每个元素调用toString()

输出看上去是这样的:

1> Fred: age 35
2> Wilma: age 35

1> 和 2> 指出了产生输出的子任务

你也可以写到文本文件

stream.writeAsText("/path/to/file")

或者CSV文件

stream.writeAsCsv("/path/to/file")

或者套接字

stream.writeToSocket(host, port, SerializationSchema)

在生产中,常用的接收器包括Kafka以及各种数据库和文件系统。

调试

在生产中,你将向应用程序运行的远程集群提交应用程序JAR文件。如果失败,远程也会失败。作业管理器和任务管理器日志在调试此类故障时非常游泳,但在IDE内部进行本地调试要容易的多,这是Flink支持的。你也可以设置断点,检查局部变量,并逐步执行代码。你也可以进入Flink代码,如果你想了解Flink的工作原理,这可能是了解更多内部信息的好方法。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
ResizeObserver loop completed with undelivered notifications
ResizeObserver loop completed with undelivered notifications
|
6月前
【Bug】ERROR ResizeObserver loop completed with undelivered notifications.
【Bug】ERROR ResizeObserver loop completed with undelivered notifications.
|
程序员 Go API
译|Don’t just check errors, handle them gracefully(一)
译|Don’t just check errors, handle them gracefully
80 0
Error:svn:E155037:Previous operation has not finished; run ‘cleanup‘ if it was interrupted(完美解决)
Error:svn:E155037:Previous operation has not finished; run ‘cleanup‘ if it was interrupted(完美解决)
411 0
Error:svn:E155037:Previous operation has not finished; run ‘cleanup‘ if it was interrupted(完美解决)
2015-03-17 current note creation logic in my task
2015-03-17 current note creation logic in my task
113 0
2015-03-17 current note creation logic in my task
|
安全 Unix
|
存储 数据库
svn报错:“Previous operation has not finished; run &#39;cleanup&#39; if it was interrupted“
之前也遇到过这个问题,不过让朋友帮忙解决的。这次又碰上了,记不起怎么弄的来了。       这是在网上查的方法。   本地.svn\wc.db数据库文件里面存储了svn的operation,表名是work_queue。
1052 0
|
数据库 数据库管理 开发工具
SVN:Previous operation has not finished; run 'cleanup' if it was interrupted
异常处理汇总-开发工具  http://www.cnblogs.com/dunitian/p/4522988.html cleanup failed to process the following paths:xxx Previous operation has not finished; run 'cleanup' if it was interrupted 解决方法有两个,一个是用sqlite清除下数据库wc.db的work_queue,这种网上说的比较多。
1246 0