kafka的消费者偏移量自动提交机制,它是按什么规律来提交消费位移的? 答:固定时间间隔 可以手动提交消费位移,如果是制动提交auto.offse.commit 消费位移提交到哪里呢? 如果是自动提交,或者是调用consumer.commitASnc(),是提交到consumer_offers主题 当然也可以手动控制消费位移保存到任意自定义存储系统中 (topicPartition,offset) kafka的consult不支持事务,kafka支持事务 producer.beginTransaction() producer.send() producer.commitTransation() producer.abortTransaction()
spark中map(element=>)每个partiton都调用一次
mapPartitions(iter=>) 只调用一次,每一个分区调用一次,这两个算子,在调用用户函数时,调用机制不同
批计算的api演示
package com.atguigu.wc; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class _02_WordCountpricatice { public static void main(String[] args) throws Exception { ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); DataSource<String> stringDataSource = batchEnv.readTextFile("file:///代码区//Fink代码//FlinkTutorial//input//word.txt"); //在dataset上调用各种dataset的算子 stringDataSource.flatMap(new MyFlatMapFuncations()) .groupBy(0).sum(1).print(); } } //接口的实现,除了匿名内部类的方式,还可以用什么方式 class MyFlatMapFuncations implements FlatMapFunction<String, Tuple2<String,Integer>>{ public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] words = value.split("\\s+"); for (String word : words) { collector.collect(Tuple2.of(word,1)); } } }
flink流批一体统一api(untimeMode)
spark的reduceByKey,会自动在上游做局部局部
mapreduce,也可以设置CombinerClass,来在map端做局部聚合
在批就散逻辑中,上游局部聚合在shuffle落盘给下游,这是可以实现的
批计算的模式,和流计算的模式,哪怕在做相同需求,底层的计算方式也会不同
flink流批一体:他可以让用户只编写一套流api所实现的计算任务,而底层可以用流模式计算,也可以自动帮你转化为模式计算
package com.atguigu.day02; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class _03_WordCountpricatice { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setRuntimeMode(RuntimeExecutionMode.BATCH); //按批计算 // env.setRuntimeMode(RuntimeExecutionMode.STREAMING); //按流计算 DataStreamSource<String> streamSource = env.readTextFile("FlinkTutorial/input/wc.txt"); streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { public void flatMap(String string, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] words = string.split("\\s+"); for (String word : words) { collector.collect(Tuple2.of(word,1)); } } }) .keyBy(new KeySelector<Tuple2<String, Integer>, String>() { public String getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }).sum(1).print(); env.execute(); } }
flinck匿名函数用法
package com.atguigu.day02; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class _04_WordCount_LambaTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment= StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> streamSource = environment.readTextFile("FlinkTutorial/input/wc.txt"); // streamSource.map(new MapFunction<String, String>() { // @Override // public String map(String string) throws Exception { // return null; // } // }); //lambda表达式怎么写,看你要实现的那个接口的方法接收什么参数,返回什么结果 //然后就按lambda语法来表达,(参数1,参数2,.....)->{ 函数体 } // streamSource.map((value)->{ // return value.toUpperCase(); // }); //至简原则:上面代码只有一行,且其中的方法调用没有参数传递,则可以把方法调用,转成"方法引用" SingleOutputStreamOperator<String> upperCasted = streamSource.map(String::toUpperCase); // upperCasted.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { // @Override // public void flatMap(String string, Collector<Tuple2<String, Integer>> collector) throws Exception { // // } // }); SingleOutputStreamOperator<Tuple2<String, Integer>> wordOne = upperCasted.flatMap((String s, Collector<Tuple2<String, Integer>> out) -> { String[] words = s.split("\\s+"); for (String word : words) { out.collect(Tuple2.of(word, 1)); } }).returns(new TypeHint<Tuple2<String, Integer>>() {}); // wordOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() { // @Override // public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { // return null; // } // }) //泛型是一种参数,确切的来说叫做泛型参数 KeyedStream<Tuple2<String, Integer>, String> keyedBy = wordOne.keyBy( (value) -> value.f0 ); keyedBy.sum(1).print(); environment.execute(); } }
开启 webui 的本地运行环境
在sparkUI界面,如果在集群上运行,可以打开7077
如果在本地运行,localhost4044
package com.atguigu.day02; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class _03_WordCountpricatice { public static void main(String[] args) throws Exception { // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.setParallelism(1); // env.setRuntimeMode(RuntimeExecutionMode.BATCH); //按批计算 // env.setRuntimeMode(RuntimeExecutionMode.STREAMING); //按流计算 Configuration conf = new Configuration(); conf.setInteger(RestOptions.PORT,8081); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); DataStreamSource<String> streamSource = env.readTextFile("FlinkTutorial/input/wc.txt"); streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { public void flatMap(String string, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] words = string.split("\\s+"); for (String word : words) { collector.collect(Tuple2.of(word,1)); } } }) .keyBy(new KeySelector<Tuple2<String, Integer>, String>() { public String getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }).sum(1).print(); env.execute(); } }
Kafka Source(生产常用)
主要就是为了让外面的监控系统可以比较及时的跟踪到系统的消费进度
数据积压等
因为外面的监控系统不方便去获取flinck的内部状态
package com.atguigu.day02; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; import org.apache.flink.types.LongValue; import org.apache.flink.util.LongValueSequenceIterator; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import java.io.IOException; import java.util.Arrays; import java.util.List; public class _05_SourceOperator_Demos { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Integer> fromElements = env.fromElements(1, 2, 3, 4, 5); // fromElements.map(d->d*10).print(); List<String> datas = Arrays.asList("a", "b", "c", "d"); //fromCollection所返回的source算子,是一个单并行度的算子 DataStreamSource<String> fromCollection = env.fromCollection(datas); // DataStreamSource<LongValue> paralleval=env.fromParallelCollection(new LongValueSequenceIterator(1,100), TypeInformation.of(Long.class)); fromCollection.map(String::toUpperCase).print(); DataStreamSource<Long> sequce = env.generateSequence(1, 100); sequce.map(x->x-1).print(); /*从文件得到数据流*/ DataStreamSource<String> files = env.readTextFile("FlinkTutorial/input/wc.txt", "utf-8"); files.map(String::toUpperCase).print(); String path = "file:///Users/xing/Desktop/a.txt"; //PROCESS_CONTINUOUSLY 模式是一直监听指定的文件或目录,2 秒钟检测一次文件是否发生变化 DataStreamSource<String> lines = env.readFile(new TextInputFormat(null),"FlinkTutorial/input/wc.txt", FileProcessingMode.PROCESS_CONTINUOUSLY, 2000); lines.map(String::toUpperCase).print(); /*从kafka总读取数据得到数据流 * */ KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setTopics("atguiguNew") .setGroupId("gb01") .setBootstrapServers("node1:9092") //OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST) // 消费起始位移选择之前提交的偏移量(如果没有,就重置为LATEST) //OffsetsInitializer.offsets(Map<TopicPartitiom,Long>))消费起始位移选择为:方法所传入每个分区和相应的起始偏移量 .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) // .setValueOnlyDeserializer(new DeserializationSchema<String>() { // @Override // public String deserialize(byte[] bytes) throws IOException { // return new String(bytes); // } // // @Override // public boolean isEndOfStream(String string) { // return false; // } // // @Override // public TypeInformation<String> getProducedType() { // return TypeInformation.of(String.class); // } // }) .setValueOnlyDeserializer(new SimpleStringSchema()) //开启了kafka底层消费者的自动位移提交机制,它会把最新的消费位移提交到kafka的consumer_offsets中 // 就算把自动位移提交机制开启,Kafkasource依然不依粮自动位移提交机制(岩机重启时,优先从flink自己的状态中去获取偏移最<更可靠>) //状态的有效性仅在subtask重试的情况下有效 .setProperty("auto.offset.commit", "true").build(); //把本spurce算子设置传成bound属性(有界流),将来本source去读取数据,读到指定的位置,就停止读取并退出 //常用于补数或重跑某段历史数据 // .setBounded(OffsetsInitializer.committedOffsets()) //把本source算子设置成UNBOUNDED属性(无界流),但是并不会一直读数据,而是达到指定位置就停止读取但程序不退出 //主要应用场景:需要从kafka中读取某一段因定长度的数据、然后拿着这段数据去跟另外一个真正的无界流联合处理 // .setUnbounded(OffsetsInitializer.latest()) // env.addSource(); //接收的是SourceFunction的实现类 //接收的是Source接口的实现类 //把算子添加到环境中得到了这个起始流 DataStreamSource<String> streamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kfk-source"); streamSource.print(); //1.读topic,从哪读呀,上一次的偏移量去读,如果上次没有偏移量,就从最新的数据中读,读完之后把数据反序列化成string,在把算子添加到环境中得到了这个起始流 env.execute(); } }
自定义 Source
Flink 的 DataStream API 可以让开发者根据实际需要,灵活的自定义 Source,本质上就是定义一个类,
实现 SourceFunction 或继承 RichParallelSourceFunction,实现 run 方法和 cancel 方法。
package com.atguigu.day02; import lombok.*; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomUtils; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.HashMap; import java.util.Map; public class _06_CustomSourceFunction { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<EventLog> dataStreamSource = env.addSource(new MySourceFuncation()); dataStreamSource.print(); env.execute(); } } //我的Source组件,要生产的数据是EventLog对象 class MySourceFuncation implements SourceFunction<EventLog>{ volatile boolean flag=true; public void run(SourceContext<EventLog> sourceContext) throws Exception { EventLog eventLog = new EventLog(); String[] events={"appLaunch","pageLoad","adClick","itemShare","putBack","wakeup"}; HashMap<String,String> eventInfoMap=new HashMap<>(); while (flag){ eventLog.setGuid(RandomUtils.nextLong(1,1000)); eventLog.setSessionId(RandomStringUtils.randomAlphanumeric(12).toLowerCase()); eventLog.setTimeStramp(System.currentTimeMillis()); eventLog.setEventId(events[RandomUtils.nextInt(0,events.length)]); eventInfoMap.put(RandomStringUtils.randomAlphanumeric(1),RandomStringUtils.randomAlphanumeric(2)); eventLog.setEventInfo(eventInfoMap); sourceContext.collect(eventLog); eventInfoMap.clear(); Thread.sleep(RandomUtils.nextInt(500,1500)); } } public void cancel() { flag=false; } } class EventLog{ private long guid; private String sessionId; private String eventId; private long timeStramp; private Map<String,String> eventInfo; }
我要打印的不是eventlog对象的toString结果,而是要打印json格式结果
package com.atguigu.day02; import com.alibaba.fastjson.JSON; import lombok.*; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomUtils; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import scala.util.parsing.json.JSONArray$; import java.util.HashMap; import java.util.Map; public class _06_CustomSourceFunction { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<EventLog> dataStreamSource = env.addSource(new MySourceFuncation()); dataStreamSource.map(JSON::toJSONString).print(); // dataStreamSource.print(); env.execute(); } } //我的Source组件,要生产的数据是EventLog对象 class MySourceFuncation implements SourceFunction<EventLog>{ volatile boolean flag=true; public void run(SourceContext<EventLog> sourceContext) throws Exception { EventLog eventLog = new EventLog(); String[] events={"appLaunch","pageLoad","adClick","itemShare","putBack","wakeup"}; HashMap<String,String> eventInfoMap=new HashMap<>(); while (flag){ eventLog.setGuid(RandomUtils.nextLong(1,1000)); eventLog.setSessionId(RandomStringUtils.randomAlphanumeric(12).toLowerCase()); eventLog.setTimeStramp(System.currentTimeMillis()); eventLog.setEventId(events[RandomUtils.nextInt(0,events.length)]); eventInfoMap.put(RandomStringUtils.randomAlphanumeric(1),RandomStringUtils.randomAlphanumeric(2)); eventLog.setEventInfo(eventInfoMap); sourceContext.collect(eventLog); eventInfoMap.clear(); Thread.sleep(RandomUtils.nextInt(500,1500)); } } public void cancel() { flag=false; } } class EventLog{ private long guid; private String sessionId; private String eventId; private long timeStramp; private Map<String,String> eventInfo; }
接口中的生命周期方法
Mapper
setup() 生命周期开始
map() 生命过程
cleanup() 生命周期结束
RichFunction rich算子生命周期方法,getRuntimeContext()
spark中的 taskSet -> flink:task
skpark中具体的并行实例task -> flink:subtask
package com.atguigu.day02; import com.alibaba.fastjson.JSON; import lombok.*; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomUtils; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import scala.util.parsing.json.JSONArray$; import java.util.HashMap; import java.util.Map; public class _06_CustomSourceFunction { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<EventLog> dataStreamSource = env.addSource(new MySourceFuncation()); dataStreamSource.map(JSON::toJSONString).print(); // dataStreamSource.print(); DataStreamSource<EventLog> dataStreamSource1 = env.addSource(new MyRichSourceFunction()); dataStreamSource1.map(JSON::toJSONString).print(); env.execute(); } } //我的Source组件,要生产的数据是EventLog对象 class MySourceFuncation implements SourceFunction<EventLog>{ volatile boolean flag=true; public void run(SourceContext<EventLog> sourceContext) throws Exception { EventLog eventLog = new EventLog(); String[] events={"appLaunch","pageLoad","adShow","adClick","itemShare","itemCollect","putBack","wakeUp","appClose"}; HashMap<String,String> eventInfoMap=new HashMap<>(); while (flag){ eventLog.setGuid(RandomUtils.nextLong(1,1000)); //字母组成的随机数 eventLog.setSessionId(RandomStringUtils.randomAlphanumeric(12).toLowerCase()); eventLog.setTimeStramp(System.currentTimeMillis()); eventLog.setEventId(events[RandomUtils.nextInt(0,events.length)]); eventInfoMap.put(RandomStringUtils.randomAlphanumeric(1),RandomStringUtils.randomAlphanumeric(2)); eventLog.setEventInfo(eventInfoMap); sourceContext.collect(eventLog); eventInfoMap.clear(); Thread.sleep(RandomUtils.nextInt(500,1500)); } } public void cancel() { flag=false; } } class MyRichSourceFunction extends RichSourceFunction<EventLog>{ volatile boolean flag=true; //source组件初始化 public void open(Configuration parameters) throws Exception { RuntimeContext runtimeContext = getRuntimeContext(); // 可以从运行时上下文中,取到本算子所属的 task 的task名 String taskName = runtimeContext.getTaskName(); // 可以从运行时上下文中,取到本算子所属的 subTask 的subTaskId int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask(); } //source组件生成数据的过程(核心工作逻辑) public void run(SourceContext<EventLog> ctx) throws Exception { EventLog eventLog = new EventLog(); String[] events={"appLaunch","pageLoad","adShow","adClick","itemShare","itemCollect","putBack","wakeUp","appClose"}; HashMap<String,String> eventInfoMap = new HashMap<>(); while (flag){ eventLog.setGuid(RandomUtils.nextLong(1,1000)); eventLog.setSessionId(RandomStringUtils.randomAlphanumeric(12).toLowerCase()); eventLog.setTimeStramp(System.currentTimeMillis()); eventLog.setEventId(events[RandomUtils.nextInt(0,events.length)]); eventInfoMap.put(RandomStringUtils.randomAlphanumeric(1),RandomStringUtils.randomAlphanumeric(2)); eventLog.setEventInfo(eventInfoMap); ctx.collect(eventLog); eventInfoMap.clear(); Thread.sleep(RandomUtils.nextInt(500,1500)); } } // job取消调用的方法 public void cancel() { flag=false; } //组件关闭调用的方法 public void close() throws Exception { System.out.println("组件被关闭了....."); } } class EventLog{ private long guid; private String sessionId; private String eventId; private long timeStramp; private Map<String,String> eventInfo; }
以上两种都是单并行度,设置并行度不能大于1
下面是可以设置的.setParallelism(2)
// 自定义source
// * 可以实现 SourceFunction 或者 RichSourceFunction , 这两者都是非并行的source算子
// * 也可实现 ParallelSourceFunction 或者 RichParallelSourceFunction , 这两者都是可并行的source算子
// *
// * -- 带 Rich的,都拥有 open() ,close() ,getRuntimeContext() 方法
// * -- 带 Parallel的,都可多实例并行执行
package com.atguigu.day02; import com.alibaba.fastjson.JSON; import lombok.*; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomUtils; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import scala.util.parsing.json.JSONArray$; import java.util.HashMap; import java.util.Map; public class _06_CustomSourceFunction { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<EventLog> dataStreamSource = env.addSource(new MySourceFuncation()); // dataStreamSource.map(JSON::toJSONString).print(); // dataStreamSource.print(); DataStreamSource<EventLog> dataStreamSource1 = env.addSource(new MyRichSourceFunction()); dataStreamSource1.map(JSON::toJSONString).print(); env.execute(); } } //我的Source组件,要生产的数据是EventLog对象 class MySourceFuncation implements SourceFunction<EventLog>{ volatile boolean flag=true; public void run(SourceContext<EventLog> sourceContext) throws Exception { EventLog eventLog = new EventLog(); String[] events={"appLaunch","pageLoad","adShow","adClick","itemShare","itemCollect","putBack","wakeUp","appClose"}; HashMap<String,String> eventInfoMap=new HashMap<>(); while (flag){ eventLog.setGuid(RandomUtils.nextLong(1,1000)); //字母组成的随机数 eventLog.setSessionId(RandomStringUtils.randomAlphanumeric(12).toLowerCase()); eventLog.setTimeStramp(System.currentTimeMillis()); eventLog.setEventId(events[RandomUtils.nextInt(0,events.length)]); eventInfoMap.put(RandomStringUtils.randomAlphanumeric(1),RandomStringUtils.randomAlphanumeric(2)); eventLog.setEventInfo(eventInfoMap); sourceContext.collect(eventLog); eventInfoMap.clear(); Thread.sleep(RandomUtils.nextInt(500,1500)); } } public void cancel() { flag=false; } } class MyParallelSourceFunction implements ParallelSourceFunction<EventLog> { public void run(SourceContext<EventLog> ctx) throws Exception { } public void cancel() { } } class MyRichParallelSourceFuncation extends RichParallelSourceFunction<EventLog>{ public void run(SourceContext<EventLog> ctx) throws Exception { } public void cancel() { } } class MyRichSourceFunction extends RichSourceFunction<EventLog>{ volatile boolean flag=true; //source组件初始化 public void open(Configuration parameters) throws Exception { RuntimeContext runtimeContext = getRuntimeContext(); // 可以从运行时上下文中,取到本算子所属的 task 的task名 String taskName = runtimeContext.getTaskName(); // 可以从运行时上下文中,取到本算子所属的 subTask 的subTaskId int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask(); } //source组件生成数据的过程(核心工作逻辑) public void run(SourceContext<EventLog> ctx) throws Exception { EventLog eventLog = new EventLog(); String[] events={"appLaunch","pageLoad","adShow","adClick","itemShare","itemCollect","putBack","wakeUp","appClose"}; HashMap<String,String> eventInfoMap = new HashMap<>(); while (flag){ eventLog.setGuid(RandomUtils.nextLong(1,1000)); eventLog.setSessionId(RandomStringUtils.randomAlphanumeric(12).toLowerCase()); eventLog.setTimeStramp(System.currentTimeMillis()); eventLog.setEventId(events[RandomUtils.nextInt(0,events.length)]); eventInfoMap.put(RandomStringUtils.randomAlphanumeric(1),RandomStringUtils.randomAlphanumeric(2)); eventLog.setEventInfo(eventInfoMap); ctx.collect(eventLog); eventInfoMap.clear(); Thread.sleep(RandomUtils.nextInt(500,1500)); } } // job取消调用的方法 public void cancel() { flag=false; } //组件关闭调用的方法 public void close() throws Exception { System.out.println("组件被关闭了....."); } } class EventLog{ private long guid; private String sessionId; private String eventId; private long timeStramp; private Map<String,String> eventInfo; }
基础 transformation 算子
映射算子
map 映射(DataStream → DataStream)
map(new MapFunction )
MapFunction: (x)-> y
[1 条变 1 条]
public class MapDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //调用 env 的 fromElements 创建一个非并行的 DataStreamSource DataStreamSource<String> words = env.fromElements( "hadoop","spark","flink","hbase","flink","spark" ); //在 map 方法中传入 MapFunction 实现类实例,重写 map 方法 DataStream<String> upperWords = words.map(new MapFunction<String, String>() { public String map(String value) throws Exception { //将每一个单词转成大写 return value.toUpperCase(); } }); //调用 Sink 将数据打印在控制台 upperWords.print(); env.execute("MapDemo"); } }
flatMap 扁平化映射(DataStream → DataStream)
flatMap( new FlatMapFcuntion)
FlatMapFunction: x-> x1, x2,x3,x4 [1 条变多条,并展平]
DataStream<Tuple2<String, Integer>> wordAndOne = lines.flatMap( new FlatMapFunction<String, Tuple2<String, Integer>>() { public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception { //将一行字符串按空格切分成一个字符串数组 String[] arr = line.split(" "); for (String word : arr) { //将单词转成小写放入到 Collector 中 out.collect(Tuple2.of(word.toLowerCase(), 1)); } } } );
如果是调用 flatMap 方法时传入 Lambda 表达式,需要在调用 flatMap 方法后,在调用 returns 方法指
定返回的数据的类型。不然 Flink 无法自动推断出返回的数据类型,会出现异常
DataStream<Tuple2<String, Integer>> wAndOne = lines.flatMap( (String line, Collector<Tuple2<String, Integer>> out) -> { Arrays.asList(line.split("\\W+")).forEach(word -> { out.collect(Tuple2.of(word.toLowerCase(), 1)); }); } ).returns(Types.TUPLE(Types.STRING, Types.INT)); //使用 returns 指定返回数据的类型
project 投影(DataStream → DataStream)
该算子只能对 Tuple 类型数据使用,project 方法的功能类似 sql 中的"select 字段";
该方法只有 Java 的 API 有,Scala 的 API 没此方法
//使用 fromElements 生成数据,数据为 Tuple3 类型,分别代表姓名、性别、年龄 DataStreamSource<Tuple3<String, String, Integer>> users = env.fromElements( Tuple3.of("佩奇", "女", 5), Tuple3.of("乔治", "男", 3) ); //投影方法只可以用于类型为 Tuple 的 DataStream,返回的数据只要姓名和年龄 DataStream<Tuple> result = users.project(0, 2);
过滤算子
filter 过滤(DataStream → DataStream)
filter(new FilterFunction)
FilterFunction : x -> true/false
DataStreamSource<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); //过滤掉奇数,保留偶数 DataStream<Integer> even = numbers.filter(new FilterFunction<Integer>() { public boolean filter(Integer value) throws Exception { return value % 2 == 0; //过滤掉返回 false 的数组 } });
map算子演示
package com.atguigu.day02; /* *各类transformation算子的api演示 * */ import com.alibaba.fastjson.JSON; import lombok.Data; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.Serializable; import java.util.List; public class _07_Transformation_Demos { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // {"uid":1,"name":"zs","friends":[{"fid":2,"name":"aa"},{"fid":3,"name":"bb"}]} DataStreamSource<String> streamSource = env.fromElements( "{\"uid\":1,\"name\":\"zs\",\"friends\":[{\"fid\":2,\"name\":\"aa\"},{\"fid\":3,\"name\":\"bb\"}]}", "{\"uid\":2,\"name\":\"ls\",\"friends\":[{\"fid\":1,\"name\":\"cc\"},{\"fid\":2,\"name\":\"aa\"}]}", "{\"uid\":3,\"name\":\"ww\",\"friends\":[{\"fid\":2,\"name\":\"aa\"}]}", "{\"uid\":4,\"name\":\"zl\",\"friends\":[{\"fid\":3,\"name\":\"bb\"}]}", "{\"uid\":5,\"name\":\"tq\",\"friends\":[{\"fid\":2,\"name\":\"aa\"},{\"fid\":3,\"name\":\"bb\"}]}" ); // 把每条json数据,转成javabean数据 SingleOutputStreamOperator<UserInfo> beanStream = streamSource.map(json -> JSON.parseObject(json, UserInfo.class)); beanStream.print(); env.execute(); } } class UserInfo implements Serializable { private int uid; private String name; private List<FriendInfo> friends; } class FriendInfo implements Serializable{ private int fid; private String name; }