Flink DataStream API Programming Guide

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介:

Example Program

The following program is a complete, working example of streaming window word count application, that counts the words coming from a web socket in 5 second windows.

 

复制代码
public class WindowWordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.of(5, TimeUnit.SECONDS))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }
    
    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
    
}
复制代码

Flink应用的代码结构如下,

Flink DataStream programs look like regular Java programs with a main() method. Each program consists of the same basic parts:

  1. Obtaining a StreamExecutionEnvironment,
  2. Connecting to data stream sources,
  3. Specifying transformations on the data streams,
  4. Specifying output for the processed data,
  5. Executing the program.

 

以这个例子,说明

首先会创建socketTextStream,从socket读入text流

接着是个flatMap,和map的不同是,map,1->1,而flatMap为1->n,而这个splitter就是将text用“”分割,将每个word作为一个tuple输出

最后,keyBy产生一个有key的tuple流,这里是以word为key

基于5s的timeWindow,对后面的计数进行sum

最终,output是print

 

Transformations

太常用的就不列了

==============================================================================================

Reduce 
KeyedStream → DataStream

A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.

复制代码
keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});
复制代码

 

Fold 
KeyedStream → DataStream

A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.

A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...

复制代码
DataStream<String> result = 
  keyedStream.fold("start", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
  });
复制代码

 

Fold和reduce的区别,fold可以有个初始值,而且foldfunciton可以将一种类型fold到另一种类型

而reduce function,只能是一种类型

 

Aggregations 
KeyedStream → DataStream

Rolling aggregations on a keyed data stream. 
The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

复制代码
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
复制代码

可以认为是特殊的reduce

不带by,只是返回value

带by,返回整个element

=============================================================================================

Union 
DataStream* → DataStream

Union of two or more data streams creating a new stream containing all the elements from all the streams. Node: If you union a data stream with itself you will get each element twice in the resulting stream.

dataStream.union(otherStream1, otherStream2, ...);

 

Connect 
DataStream,DataStream → ConnectedStreams

"Connects" two data streams retaining their types. Connect allowing for shared state between the two streams.

DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

connect就是两个不同type的流可以共享一个流,tuple可以同时拿到来自两个流的数据

 

CoMap, CoFlatMap 
ConnectedStreams → DataStream

Similar to map and flatMap on a connected data stream

复制代码
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
复制代码

 

Split 
DataStream → SplitStream

Split the stream into two or more streams according to some criterion.

复制代码
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});
复制代码

 

Select 
SplitStream → DataStream

Select one or more streams from a split stream.

SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");

 

====================================================================================

 

Project 
DataStream → DataStream

Selects a subset of fields from the tuples

DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0);

 

===========================================================================================

Window 
KeyedStream → WindowedStream

Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows.

dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))); // Last 5 seconds of data

基于keyedStream的window

 

WindowAll 
DataStream → AllWindowedStream

Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows.

WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator.

dataStream.windowAll(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))); // Last 5 seconds of data

主要,由于没有key,所以如果要对all做transform,是无法parallel的,只能在一个task里面做

 

Window Apply 
WindowedStream → DataStream 
AllWindowedStream → DataStream

Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.

Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead.

 

Window Reduce 
WindowedStream → DataStream

Applies a functional reduce function to the window and returns the reduced value.

 

Aggregations on windows 
WindowedStream → DataStream

Aggregates the contents of a window. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

windowedStream.sum(0);
windowedStream.sum("key");

 

Window Join 
DataStream,DataStream → DataStream

Join two data streams on a given key and a common window.

dataStream.join(otherStream)
    .where(0).equalTo(1)
    .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
    .apply (new JoinFunction () {...});

 

 

Physical partitioning

类似storm的group方式,可以自己配置

Hash partitioning, 等同于 groupby field 
DataStream → DataStream

Identical to keyBy but returns a DataStream instead of a KeyedStream.

dataStream.partitionByHash("someKey");
dataStream.partitionByHash(0);

 

Custom partitioning 
DataStream → DataStream

Uses a user-defined Partitioner to select the target task for each element.

dataStream.partitionCustom(new Partitioner(){...}, "someKey");
dataStream.partitionCustom(new Partitioner(){...}, 0);

 

Random partitioning,等同于shuffle 
DataStream → DataStream

Partitions elements randomly according to a uniform distribution.

dataStream.partitionRandom();

 

Rebalancing (Round-robin partitioning) 
DataStream → DataStream

Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew.

dataStream.rebalance();

这个保证数据不会skew,round-robin就是每个一条,轮流来

 

Broadcasting,等同于globle 
DataStream → DataStream

Broadcasts elements to every partition.

dataStream.broadcast();

 

Task chaining and resource groups

Chaining two subsequent transformations means co-locating them within the same thread for better performance. 
Flink by default chains operators if this is possible (e.g., two subsequent map transformations).

The API gives fine-grained control over chaining if desired:

A resource group is a slot in Flink, see slots. You can manually isolate operators in separate slots if desired.

Start new chain

Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper.

someStream.filter(...).map(...).startNewChain().map(...);

注意startNewChain是应用于,左边的那个operator,所以上面从第一个map开始start new chain

 

Disable chaining

Do not chain the map operator

someStream.map(...).disableChaining();

Start a new resource group

Start a new resource group containing the map and the subsequent operators.

someStream.map(...).startNewResourceGroup();
意思就是他们share同一个slot?

Isolate resources

Isolate the operator in its own slot.

someStream.map(...).isolateResources();

使用独立的slot

 

Execution Configuration

只有下面两个和batch的配置不同,

Parameters in the ExecutionConfig that pertain specifically to the DataStream API are:

  • enableTimestamps() / disableTimestamps(): Attach a timestamp to each event emitted from a source.areTimestampsEnabled()returns the current value.

  • setAutoWatermarkInterval(long milliseconds): Set the interval for automatic watermark emission. You can get the current value withlong getAutoWatermarkInterval()

 

Debugging

A LocalEnvironment is created and used as follows:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<String> lines = env.addSource(/* some source */);
// build your program

env.execute();

 

Collection data sources can be used as follows:

复制代码
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
复制代码

 

Flink also provides a sink to collect DataStream results for testing and debugging purposes. It can be used as follows:

import org.apache.flink.contrib.streaming.DataStreamUtils

DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)

 

Windows

 

Working with Time

3种时间,

Processing time,真正的处理时间

Event time, 事件真正发生的时间

Ingestion time,数据进入flink时间,在data source

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

默认是用processing 时间,

如果要用event time,you need to follow four steps:

  • Set env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  • Use DataStream.assignTimestamps(...) in order to tell Flink how timestamps relate to events (e.g., which record field is the timestamp)

  • Set enableTimestamps(), as well the interval for watermark emission (setAutoWatermarkInterval(long milliseconds)) inExecutionConfig.

 

For example, assume that we have a data stream of tuples, in which the first field is the timestamp (assigned by the system that generates these data streams), and we know that the lag between the current processing time and the timestamp of an event is never more than 1 second:

复制代码
DataStream<Tuple4<Long,Integer,Double,String>> stream = //...
stream.assignTimestamps(new TimestampExtractor<Tuple4<Long,Integer,Double,String>>{
    @Override
    public long extractTimestamp(Tuple4<Long,Integer,Double,String> element, long currentTimestamp) {
        return element.f0;
    }

    @Override
    public long extractWatermark(Tuple4<Long,Integer,Double,String> element, long currentTimestamp) {
        return element.f0 - 1000;
    }

    @Override
    public long getCurrentWatermark() {
        return Long.MIN_VALUE;
    }
});
复制代码

 

Basic Window Constructs

Tumbling time window,非滑动 
KeyedStream → WindowedStream

Defines a window of 5 seconds, that "tumbles".

keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS));

 

Sliding time window,滑动 
KeyedStream → WindowedStream

Defines a window of 5 seconds, that "slides" by 1 seconds.

keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS));

Tumbling count window 
KeyedStream → WindowedStream

keyedStream.countWindow(1000);

Sliding count window 
KeyedStream → WindowedStream

keyedStream.countWindow(1000, 100)

 

Advanced Window Constructs

The general recipe for building a custom window is to specify (1) a WindowAssigner, (2) a Trigger (optionally), and (3) anEvictor (optionally).

上面的如timeWindow,是封装好的,而如果用advanced构建方式,需要3步,

1. 首先是WindowAssigner,主要是滑动和非滑动两类,解决主要的是where的问题

Global window 
KeyedStream → WindowedStream

All incoming elements of a given key are assigned to the same window. The window does not contain a default trigger, hence it will never be triggered if a trigger is not explicitly specified.

stream.window(GlobalWindows.create());

用于count window

Tumbling time windows 
KeyedStream → WindowedStream

stream.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)));
The window comes with a default trigger. For event/ingestion time, a window is triggered when a watermark with value higher than its end-value is received, 
whereas for processing time when the current processing time exceeds its current end value.

默认的trigger, 
先理解watermark的含义:当我收到一个watermark时,表示我不可能收到event time 小于该water mark的数据 
所以我收到的water mark都大于我window的结束时间,说明,window的数据已经到齐了,可以触发trigger

 

Sliding time windows 
KeyedStream → WindowedStream

stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)));

默认的trigger与上同,

 

2. 第二步,是定义trigger,何时触发,解决的是when的问题

The Trigger specifies when the function that comes after the window clause (e.g., sumcount) is evaluated (“fires”) for each window. 
If a trigger is not specified, a default trigger for each window type is used (that is part of the definition of theWindowAssigner).

 

Processing time trigger

A window is fired when the current processing time exceeds its end-value. The elements on the triggered window are henceforth discarded.

windowedStream.trigger(ProcessingTimeTrigger.create());

 

Watermark trigger

A window is fired when a watermark with value that exceeds the window's end-value has been received. The elements on the triggered window are henceforth discarded.

windowedStream.trigger(EventTimeTrigger.create());

Continuous processing time trigger

A window is periodically considered for being fired (every 5 seconds in the example). The window is actually fired only when the current processing time exceeds its end-value. The elements on the triggered window are retained.

windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.of(5, TimeUnit.SECONDS)));

Continuous watermark time trigger

A window is periodically considered for being fired (every 5 seconds in the example). A window is actually fired when a watermark with value that exceeds the window's end-value has been received. The elements on the triggered window are retained.

windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.of(5, TimeUnit.SECONDS)));

这个和上面的不同,在于,window在触发后,不会被discard,而是会保留,并且每隔一段时间会反复的触发

 

Count trigger

A window is fired when it has more than a certain number of elements (1000 below). The elements of the triggered window are retained.

windowedStream.trigger(CountTrigger.of(1000));

按count触发,window会被保留

 

Purging trigger

Takes any trigger as an argument and forces the triggered window elements to be "purged" (discarded) after triggering.

windowedStream.trigger(PurgingTrigger.of(CountTrigger.of(1000)));

上面有些trigger是会retain数据的,如果你想discard,怎么搞? 用PurgingTrigger

 

Delta trigger

A window is periodically considered for being fired (every 5000 milliseconds in the example). A window is actually fired when the value of the last added element exceeds the value of the first element inserted in the window according to a `DeltaFunction`.

windowedStream.trigger(new DeltaTrigger.of(5000.0, new DeltaFunction<Double>() {
    @Override
    public double getDelta (Double old, Double new) {
        return (new - old > 0.01);
    }
}));

Delta trigger,即,每次会通过getDelta比较新来的值和旧值的delta,当delta大于定义的阈值时,就会fire

 

3. 最后,指定Evictor

After the trigger fires, and before the function (e.g., sumcount) is applied to the window contents, an optional Evictorremoves some elements from the beginning of the window before the remaining elements are passed on to the function.

说白了,当windows被触发时,我们可以选取部分数据进行处理,

evictor,清除者,即清除部分数据,保留你想要的

Time evictor

Evict all elements from the beginning of the window, so that elements from end-value - 1 second until end-value are retained (the resulting window size is 1 second).

triggeredStream.evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)));

Count evictor

Retain 1000 elements from the end of the window backwards, evicting all others.

triggeredStream.evictor(CountEvictor.of(1000));

逻辑是保留,而不是清除,比如CountEvictor.of(1000)是保留最后1000个,有点不好理解

 

Delta evictor

Starting from the beginning of the window, evict elements until an element with value lower than the value of the last element is found (by a threshold and a DeltaFunction).

triggeredStream.evictor(DeltaEvictor.of(5000, new DeltaFunction<Double>() {
  public double (Double oldValue, Double newValue) {
      return newValue - oldValue;
  }
}));

 

Recipes for Building Windows

下面给出一些window定义的例子,理解一下,例子给的太简单

image

 

Windows on Unkeyed Data Streams

window,也可以用于unkeyed的数据流,

不同,是在window后面加上all,

Tumbling time window all 
DataStream → WindowedStream

Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time used is controlled by the StreamExecutionEnvironment.

nonKeyedStream.timeWindowAll(Time.of(5, TimeUnit.SECONDS));

Sliding time window all 
DataStream → WindowedStream

Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at least 4 seconds) The notion of time used is controlled by the StreamExecutionEnvironment.

nonKeyedStream.timeWindowAll(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS));

Tumbling count window all 
DataStream → WindowedStream

Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window.

nonKeyedStream.countWindowAll(1000)

Sliding count window all 
DataStream → WindowedStream

Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at least 900 elements).

nonKeyedStream.countWindowAll(1000, 100)

 

Working with State

All transformations in Flink may look like functions (in the functional processing terminology), but are in fact stateful operators.

You can make everytransformation (mapfilter, etc) stateful by declaring local variables or using Flink’s state interface.

You can register any local variable as managedstate by implementing an interface.

In this case, and also in the case of using Flink’s native state interface, Flink will automatically take consistent snapshots of your state periodically, and restore its value in the case of a failure.

The end effect is that updates to any form of state are the same under failure-free execution and execution under failures.

First, we look at how to make local variables consistent under failures, and then we look at Flink’s state interface.

By default state checkpoints will be stored in-memory at the JobManager. For proper persistence of large state, Flink supports storing the checkpoints on file systems (HDFS, S3, or any mounted POSIX file system), which can be configured in the flink-conf.yaml or viaStreamExecutionEnvironment.setStateBackend(…).

这块是Flink流式处理的核心价值,可以方便的checkpoint的local state,有几种方式,后面会具体说;

默认情况下,这些checkpoints 是存储在JobManager的内存中的,当然也可以配置checkpoint到文件系统

 

Checkpointing Local Variables

这个比较好理解

Local variables can be checkpointed by using the Checkpointed interface.

When the user-defined function implements the Checkpointed interface, the snapshotState(…) and restoreState(…) methods will be executed to draw and restore function state.

复制代码
public class CounterSum implements ReduceFunction<Long>, Checkpointed<Long> {

    // persistent counter
    private long counter = 0;

    @Override
    public Long reduce(Long value1, Long value2) {
        counter++;
        return value1 + value2;
    }

    // regularly persists state during normal operation
    @Override
    public Serializable snapshotState(long checkpointId, long checkpointTimestamp) {
        return counter;
    }

    // restores state on recovery from failure
    @Override
    public void restoreState(Long state) {
        counter = state;
    }
}
复制代码

如上,只是实现snapshotState和restoreState,就可以对local变量counter实现checkpoint,这个很好理解

n addition to that, user functions can also implement the CheckpointNotifier interface to receive notifications on completed checkpoints via thenotifyCheckpointComplete(long checkpointId) method. Note that there is no guarantee for the user function to receive a notification if a failure happens between checkpoint completion and notification. The notifications should hence be treated in a way that notifications from later checkpoints can subsume missing notifications.、

除此,还能实现CheckpointNotifier ,这样当完成checkpoints时,会调用notifyCheckpointComplete,但不能保证一定触发

 

Using the Key/Value State Interface

这个是显式调用state interface

The state interface gives access to key/value states, which are a collection of key/value pairs. 
Because the state is partitioned by the keys (distributed accross workers), it can only be used on the KeyedStream, created via stream.keyBy(…) (which means also that it is usable in all types of functions on keyed windows).

The handle to the state can be obtained from the function’s RuntimeContext
The state handle will then give access to the value mapped under the key of the current record or window - each key consequently has its own value.

The following code sample shows how to use the key/value state inside a reduce function. 
When creating the state handle, one needs to supply a name for that state (a function can have multiple states of different types), the type of the state (used to create efficient serializers), and the default value (returned as a value for keys that do not yet have a value associated).

复制代码
public class CounterSum implements RichReduceFunction<Long> {

    /** The state handle */
    private OperatorState<Long> counter;

    @Override
    public Long reduce(Long value1, Long value2) {
        counter.update(counter.value() + 1);
        return value1 + value2;
    }

    @Override
    public void open(Configuration config) {
        counter = getRuntimeContext().getKeyValueState("myCounter", Long.class, 0L);
    }
}
复制代码

 

State updated by this is usually kept locally inside the flink process (unless one configures explicitly an external state backend). This means that lookups and updates are process local and this very fast.

The important implication of having the keys set implicitly is that it forces programs to group the stream by key (via thekeyBy() function), making the key partitioning transparent to Flink. That allows the system to efficiently restore and redistribute keys and state.

The Scala API has shortcuts that for stateful map() or flatMap() functions on KeyedStream, which give the state of the current key as an option directly into the function, and return the result with a state update:

复制代码
val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })
复制代码

 

State Checkpoints in Iterative Jobs

Flink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing:env.enableCheckpointing(interval, force = true).

Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.

对于iterative,即有环的case,做checkpoint更加复杂点,并且恢复后,会丢失中间过程,比如n次迭代,执行到n-1次,失败,还是要从1开始

 

 

Iterations

For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:

复制代码
DataStream<Long> someIntegers = env.generateSequence(0, 1000);
        
IterativeStream<Long> iteration = someIntegers.iterate();

DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
  @Override
  public Long map(Long value) throws Exception {
    return value - 1 ;
  }
});

DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value > 0);
  }
});

iteration.closeWith(stillGreaterThanZero);

DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value <= 0);
  }
});
复制代码

这个直接看例子,

首先,someIntegers是一个由0到1000的DataStream

对于每个tuple,都需要迭代的执行一个map function,在这儿,会不断减一

什么时候结束,

根据iteration.closeWith,closeWith后面是一个filter,如果filter返回为true,这个tuple就继续iterate,如果返回为false,就close iterate

而最后的lessThanZero是someIntegers经过iterate后,最终产生的输出DataStream

 

 

 

 

Connectors

Connectors provide code for interfacing with various third-party systems.

Currently these systems are supported:

To run an application using one of these connectors, additional third party components are usually required to be installed and launched, e.g. the servers for the message queues. Further instructions for these can be found in the corresponding subsections. Docker containers are also provided encapsulating these services to aid users getting started with connectors.

 

只看下kafka,

Then, import the connector in your maven project:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka</artifactId>
  <version>0.10.2</version>
</dependency>
使用的例子,
复制代码
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
    .addSource(new FlinkKafkaConsumer082<>("topic", new SimpleStringSchema(), properties))
    .print();
复制代码

 

如何fault tolerance?

With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that where stored in the checkpoint.

原理就是会和其他state一起把所有的kafka partition的offset都checkpoint下来,这样恢复的时候,可以从这些offset开始读;

 

To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs

If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.

由于用的是simple consumer,所以就算不开checkpoint,offset也要被记录;这里使用通常的做法把kafka的offset记录到zookeeper

 

也可以把数据写入kafka,FlinkKafkaProducer

The FlinkKafkaProducer writes data to a Kafka topic. The producer can specify a custom partitioner that assigns recors to partitions.

tream.addSink(new FlinkKafkaProducer<String>("localhost:9092", "my-topic", new SimpleStringSchema()));
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
SQL 分布式计算 测试技术
Flink API的4个层次
【2月更文挑战第28天】
|
1月前
|
消息中间件 SQL Kafka
如何高效接入 Flink: Connecter / Catalog API 核心设计与社区进展
本文整理自阿里云实时计算团队 Apache Flink Committer 和 PMC Member 任庆盛在 FFA 2023 核心技术专场(二)中的分享。
286 0
如何高效接入 Flink: Connecter / Catalog API 核心设计与社区进展
|
1月前
|
分布式计算 API 数据处理
Flink【基础知识 01】(简介+核心架构+分层API+集群架构+应用场景+特点优势)(一篇即可大概了解flink)
【2月更文挑战第15天】Flink【基础知识 01】(简介+核心架构+分层API+集群架构+应用场景+特点优势)(一篇即可大概了解flink)
60 1
|
2月前
|
消息中间件 SQL canal
Flink转换问题之DataStream转成table失败如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
3月前
|
JSON 关系型数据库 MySQL
这个问题是由于Flink的Table API在处理MySQL数据时,将MULTISET类型的字段转换为了JSON格式
【1月更文挑战第17天】【1月更文挑战第84篇】这个问题是由于Flink的Table API在处理MySQL数据时,将MULTISET类型的字段转换为了JSON格式
34 1
|
3月前
|
消息中间件 Java Kafka
Flink中的DataStream和DataSet有什么区别?请解释其概念和用途。
Flink中的DataStream和DataSet有什么区别?请解释其概念和用途。
36 0
|
4月前
|
存储 API 数据安全/隐私保护
2021年最新最全Flink系列教程__Flink高级API(四)
2021年最新最全Flink系列教程__Flink高级API(四)
36 0
|
4月前
|
消息中间件 API 数据安全/隐私保护
2021年最新最全Flink系列教程__Flink高级API(三)
2021年最新最全Flink系列教程__Flink高级API(三)
18 0
|
4月前
|
消息中间件 Kafka API
2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二.五)
2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二.五)
40 0
|
4月前
|
消息中间件 关系型数据库 MySQL
2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二)
2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二)
126 0