分层API
Flink提供了三个分层的API。 如下所示,每个API针对不同的使用场景在简洁性和表达性之间提供了不同的权衡。
1. ProcessFunctions
Flink提供了ProcessFunctions来处理来自一个或两个输入流或在一个窗口中分组的事件的单个事件。ProcessFunctions提供对时间和状态的细粒度控制。ProcessFunction可以任意修改其状态并注册计时器,这些计时器将来会触发回调函数。因此,ProcessFunctions可以根据许多有状态事件驱动的应用程序的需要,实现复杂的每事件业务逻辑。
下面的示例显示一个在KeyedStream上运行并匹配START和END事件的KeyedProcessFunction。接收到START事件时,该函数会记住其时间戳状态,并在四个小时内注册一个计时器。如果在计时器触发之前收到END事件,则该函数将计算END和START事件之间的持续时间,清除状态并返回该值。否则,计时器将触发并清除状态。
/** * Matches keyed START and END events and computes the difference between * both elements' timestamps. The first String field is the key attribute, * the second String attribute marks START and END events. */ public static class StartEndDuration extends KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, Long>> { private ValueState<Long> startTime; @Override public void open(Configuration conf) { // obtain state handle startTime = getRuntimeContext() .getState(new ValueStateDescriptor<Long>("startTime", Long.class)); } /** Called for each processed event. */ @Override public void processElement( Tuple2<String, String> in, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception { switch (in.f1) { case "START": // set the start time if we receive a start event. startTime.update(ctx.timestamp()); // register a timer in four hours from the start event. ctx.timerService() .registerEventTimeTimer(ctx.timestamp() + 4 * 60 * 60 * 1000); break; case "END": // emit the duration between start and end event Long sTime = startTime.value(); if (sTime != null) { out.collect(Tuple2.of(in.f0, ctx.timestamp() - sTime)); // clear the state startTime.clear(); } default: // do nothing } } /** Called when a timer fires. */ @Override public void onTimer( long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) { // Timeout interval exceeded. Cleaning up the state. startTime.clear(); } }
2. DataStream API
DataStream API为许多常见的流处理操作提供了原语,例如窗口化,一次记录转换和通过查询外部数据存储来丰富事件。DataStream API可用于Java和Scala,并且基于诸如map(),reduce()和aggregate()之类的函数。函数可以通过扩展接口定义,也可以定义为Java或Scala lambda函数。
以下示例显示了如何对点击流进行会话化并计算每个会话的点击次数。
// a stream of website clicks DataStream<Click> clicks = ... DataStream<Tuple2<String, Long>> result = clicks // project clicks to userId and add a 1 for counting .map( // define function by implementing the MapFunction interface. new MapFunction<Click, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(Click click) { return Tuple2.of(click.userId, 1L); } }) // key by userId (field 0) .keyBy(0) // define session window with 30 minute gap .window(EventTimeSessionWindows.withGap(Time.minutes(30L))) // count clicks per session. Define function as lambda function. .reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));
3. SQL & Table API
Flink具有两个关系API,即表API和SQL。这两个API都是用于批处理和流处理的统一API,即查询在无限制的实时流或有限制的记录流上以相同的语义执行,并产生相同的结果。Table API和SQL利用Apache Calcite进行解析,验证和查询优化。它们可以与DataStream和DataSet API无缝集成,并支持用户定义的标量,聚合和表值函数。
Flink的关系API旨在简化数据分析,数据管道和ETL应用程序的定义。
以下示例显示了SQL查询,以对点击流进行会话并计算每个会话的点击次数。这与DataStream API示例中的用例相同。
SELECT userId, COUNT(*) FROM clicks GROUP BY SESSION(clicktime, INTERVAL '30' MINUTE), userId