flink 入门编程day02

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: flink 入门编程day02
kafka的消费者偏移量自动提交机制,它是按什么规律来提交消费位移的?
答:固定时间间隔
可以手动提交消费位移,如果是制动提交auto.offse.commit
消费位移提交到哪里呢?
如果是自动提交,或者是调用consumer.commitASnc(),是提交到consumer_offers主题
当然也可以手动控制消费位移保存到任意自定义存储系统中
(topicPartition,offset)
kafka的consult不支持事务,kafka支持事务
producer.beginTransaction()
producer.send()
producer.commitTransation()
producer.abortTransaction()

spark中map(element=>)每个partiton都调用一次

mapPartitions(iter=>) 只调用一次,每一个分区调用一次,这两个算子,在调用用户函数时,调用机制不同

批计算的api演示

package com.atguigu.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;


public class _02_WordCountpricatice {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> stringDataSource = batchEnv.readTextFile("file:///代码区//Fink代码//FlinkTutorial//input//word.txt");
       //在dataset上调用各种dataset的算子
        stringDataSource.flatMap(new MyFlatMapFuncations())
                .groupBy(0).sum(1).print();
    }
}
//接口的实现,除了匿名内部类的方式,还可以用什么方式
class MyFlatMapFuncations implements FlatMapFunction<String, Tuple2<String,Integer>>{

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
        String[] words = value.split("\\s+");
        for (String word : words) {
            collector.collect(Tuple2.of(word,1));
        }
    }
}

image.png

flink流批一体统一api(untimeMode)

spark的reduceByKey,会自动在上游做局部局部

mapreduce,也可以设置CombinerClass,来在map端做局部聚合

在批就散逻辑中,上游局部聚合在shuffle落盘给下游,这是可以实现的

批计算的模式,和流计算的模式,哪怕在做相同需求,底层的计算方式也会不同

flink流批一体:他可以让用户只编写一套流api所实现的计算任务,而底层可以用流模式计算,也可以自动帮你转化为模式计算

package com.atguigu.day02;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class _03_WordCountpricatice {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.BATCH); //按批计算
//        env.setRuntimeMode(RuntimeExecutionMode.STREAMING); //按流计算

        DataStreamSource<String> streamSource = env.readTextFile("FlinkTutorial/input/wc.txt");
        streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String string, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = string.split("\\s+");
                for (String word : words) {
                    collector.collect(Tuple2.of(word,1));
                }
            }
        })
                .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        }).sum(1).print();
        env.execute();
    }
}

image.png

flinck匿名函数用法

package com.atguigu.day02;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class _04_WordCount_LambaTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment= StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> streamSource =  environment.readTextFile("FlinkTutorial/input/wc.txt");
//        streamSource.map(new MapFunction<String, String>() {
//            @Override
//            public String map(String string) throws Exception {
//                return null;
//            }
//        });
        //lambda表达式怎么写,看你要实现的那个接口的方法接收什么参数,返回什么结果
        //然后就按lambda语法来表达,(参数1,参数2,.....)->{ 函数体 }
//        streamSource.map((value)->{
//            return value.toUpperCase();
//        });
        //至简原则:上面代码只有一行,且其中的方法调用没有参数传递,则可以把方法调用,转成"方法引用"
        SingleOutputStreamOperator<String> upperCasted = streamSource.map(String::toUpperCase);
//        upperCasted.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
//            @Override
//            public void flatMap(String string, Collector<Tuple2<String, Integer>> collector) throws Exception {
//
//            }
//        });
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordOne = upperCasted.flatMap((String s, Collector<Tuple2<String, Integer>> out) -> {
            String[] words = s.split("\\s+");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1));
            }
        }).returns(new TypeHint<Tuple2<String, Integer>>() {});
//        wordOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
//            @Override
//            public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
//                return null;
//            }
//        })
        //泛型是一种参数,确切的来说叫做泛型参数
        KeyedStream<Tuple2<String, Integer>, String> keyedBy = wordOne.keyBy(
                (value) -> value.f0
        );
        keyedBy.sum(1).print();
        environment.execute();

    }
}

image.png

开启 webui 的本地运行环境

在sparkUI界面,如果在集群上运行,可以打开7077

如果在本地运行,localhost4044

package com.atguigu.day02;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;



public class _03_WordCountpricatice {
    public static void main(String[] args) throws Exception {
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(1);
//        env.setRuntimeMode(RuntimeExecutionMode.BATCH); //按批计算
//        env.setRuntimeMode(RuntimeExecutionMode.STREAMING); //按流计算
        Configuration conf = new Configuration();
        conf.setInteger(RestOptions.PORT,8081);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);


        DataStreamSource<String> streamSource = env.readTextFile("FlinkTutorial/input/wc.txt");
        streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String string, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = string.split("\\s+");
                for (String word : words) {
                    collector.collect(Tuple2.of(word,1));
                }
            }
        })
                .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        }).sum(1).print();
        env.execute();
    }
}

Kafka Source(生产常用)

image.png

主要就是为了让外面的监控系统可以比较及时的跟踪到系统的消费进度

数据积压等

因为外面的监控系统不方便去获取flinck的内部状态

package com.atguigu.day02;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.LongValueSequenceIterator;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

public class _05_SourceOperator_Demos {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Integer> fromElements = env.fromElements(1, 2, 3, 4, 5);
//        fromElements.map(d->d*10).print();
        List<String> datas = Arrays.asList("a", "b", "c", "d");
        //fromCollection所返回的source算子,是一个单并行度的算子
        DataStreamSource<String> fromCollection = env.fromCollection(datas);
//        DataStreamSource<LongValue> paralleval=env.fromParallelCollection(new LongValueSequenceIterator(1,100), TypeInformation.of(Long.class));
        fromCollection.map(String::toUpperCase).print();
        DataStreamSource<Long> sequce = env.generateSequence(1, 100);
        sequce.map(x->x-1).print();
        /*从文件得到数据流*/
        DataStreamSource<String> files = env.readTextFile("FlinkTutorial/input/wc.txt", "utf-8");
        files.map(String::toUpperCase).print();
        String path = "file:///Users/xing/Desktop/a.txt";
//PROCESS_CONTINUOUSLY 模式是一直监听指定的文件或目录,2 秒钟检测一次文件是否发生变化
        DataStreamSource<String> lines = env.readFile(new TextInputFormat(null),"FlinkTutorial/input/wc.txt",
                FileProcessingMode.PROCESS_CONTINUOUSLY, 2000);
        lines.map(String::toUpperCase).print();
        /*从kafka总读取数据得到数据流
        * */
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setTopics("atguiguNew")
                .setGroupId("gb01")
                .setBootstrapServers("node1:9092")
                //OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)
                // 消费起始位移选择之前提交的偏移量(如果没有,就重置为LATEST)
                //OffsetsInitializer.offsets(Map<TopicPartitiom,Long>))消费起始位移选择为:方法所传入每个分区和相应的起始偏移量
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
//                                                        .setValueOnlyDeserializer(new DeserializationSchema<String>() {
//                                                            @Override
//                                                            public String deserialize(byte[] bytes) throws IOException {
//                                                                return new String(bytes);
//                                                            }
//
//                                                            @Override
//                                                            public boolean isEndOfStream(String string) {
//                                                                return false;
//                                                            }
//
//                                                            @Override
//                                                            public TypeInformation<String> getProducedType() {
//                                                                return TypeInformation.of(String.class);
//                                                            }
//                                                        })
                .setValueOnlyDeserializer(new SimpleStringSchema())
                //开启了kafka底层消费者的自动位移提交机制,它会把最新的消费位移提交到kafka的consumer_offsets中
                // 就算把自动位移提交机制开启,Kafkasource依然不依粮自动位移提交机制(岩机重启时,优先从flink自己的状态中去获取偏移最<更可靠>)
                //状态的有效性仅在subtask重试的情况下有效
                .setProperty("auto.offset.commit", "true").build();
        //把本spurce算子设置传成bound属性(有界流),将来本source去读取数据,读到指定的位置,就停止读取并退出
                //常用于补数或重跑某段历史数据
//                .setBounded(OffsetsInitializer.committedOffsets())
                //把本source算子设置成UNBOUNDED属性(无界流),但是并不会一直读数据,而是达到指定位置就停止读取但程序不退出
                //主要应用场景:需要从kafka中读取某一段因定长度的数据、然后拿着这段数据去跟另外一个真正的无界流联合处理
//        .setUnbounded(OffsetsInitializer.latest())
//        env.addSource(); //接收的是SourceFunction的实现类
        //接收的是Source接口的实现类
        //把算子添加到环境中得到了这个起始流
        DataStreamSource<String> streamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(),
                "kfk-source");

        streamSource.print();
//1.读topic,从哪读呀,上一次的偏移量去读,如果上次没有偏移量,就从最新的数据中读,读完之后把数据反序列化成string,在把算子添加到环境中得到了这个起始流

        env.execute();
    }
}

image.png

image.png

自定义 Source

Flink DataStream API 可以让开发者根据实际需要,灵活的自定义 Source,本质上就是定义一个类,

实现 SourceFunction 或继承 RichParallelSourceFunction,实现 run 方法和 cancel 方法。

package com.atguigu.day02;

import lombok.*;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.HashMap;
import java.util.Map;

public class _06_CustomSourceFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<EventLog> dataStreamSource = env.addSource(new MySourceFuncation());
        dataStreamSource.print();
        env.execute();

    }
}
//我的Source组件,要生产的数据是EventLog对象
class MySourceFuncation implements SourceFunction<EventLog>{
   volatile boolean flag=true;

    @Override
    public void run(SourceContext<EventLog> sourceContext) throws Exception {
        EventLog eventLog = new EventLog();
        String[] events={"appLaunch","pageLoad","adClick","itemShare","putBack","wakeup"};
        HashMap<String,String> eventInfoMap=new HashMap<>();
        while (flag){
            eventLog.setGuid(RandomUtils.nextLong(1,1000));
            eventLog.setSessionId(RandomStringUtils.randomAlphanumeric(12).toLowerCase());
            eventLog.setTimeStramp(System.currentTimeMillis());
            eventLog.setEventId(events[RandomUtils.nextInt(0,events.length)]);
            eventInfoMap.put(RandomStringUtils.randomAlphanumeric(1),RandomStringUtils.randomAlphanumeric(2));
            eventLog.setEventInfo(eventInfoMap);
            sourceContext.collect(eventLog);
            eventInfoMap.clear();
            Thread.sleep(RandomUtils.nextInt(500,1500));

        }
    }

    @Override
    public void cancel() {
        flag=false;

    }
}
@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
@ToString
class EventLog{
    private long guid;
    private String sessionId;
    private String eventId;
    private long timeStramp;
    private Map<String,String> eventInfo;
}

我要打印的不是eventlog对象的toString结果,而是要打印json格式结果

package com.atguigu.day02;

import com.alibaba.fastjson.JSON;
import lombok.*;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import scala.util.parsing.json.JSONArray$;

import java.util.HashMap;
import java.util.Map;

public class _06_CustomSourceFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<EventLog> dataStreamSource = env.addSource(new MySourceFuncation());
        dataStreamSource.map(JSON::toJSONString).print();
//        dataStreamSource.print();
        env.execute();

    }
}
//我的Source组件,要生产的数据是EventLog对象
class MySourceFuncation implements SourceFunction<EventLog>{
   volatile boolean flag=true;

    @Override
    public void run(SourceContext<EventLog> sourceContext) throws Exception {
        EventLog eventLog = new EventLog();
        String[] events={"appLaunch","pageLoad","adClick","itemShare","putBack","wakeup"};
        HashMap<String,String> eventInfoMap=new HashMap<>();
        while (flag){
            eventLog.setGuid(RandomUtils.nextLong(1,1000));
            eventLog.setSessionId(RandomStringUtils.randomAlphanumeric(12).toLowerCase());
            eventLog.setTimeStramp(System.currentTimeMillis());
            eventLog.setEventId(events[RandomUtils.nextInt(0,events.length)]);
            eventInfoMap.put(RandomStringUtils.randomAlphanumeric(1),RandomStringUtils.randomAlphanumeric(2));
            eventLog.setEventInfo(eventInfoMap);
            sourceContext.collect(eventLog);
            eventInfoMap.clear();
            Thread.sleep(RandomUtils.nextInt(500,1500));

        }
    }

    @Override
    public void cancel() {
        flag=false;

    }
}
@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
@ToString
class EventLog{
    private long guid;
    private String sessionId;
    private String eventId;
    private long timeStramp;
    private Map<String,String> eventInfo;
}

image.png

接口中的生命周期方法

Mapper

setup()    生命周期开始

map()    生命过程

cleanup() 生命周期结束

RichFunction rich算子生命周期方法,getRuntimeContext()

spark中的 taskSet  -> flink:task

skpark中具体的并行实例task -> flink:subtask

package com.atguigu.day02;

import com.alibaba.fastjson.JSON;
import lombok.*;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import scala.util.parsing.json.JSONArray$;

import java.util.HashMap;
import java.util.Map;

public class _06_CustomSourceFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<EventLog> dataStreamSource = env.addSource(new MySourceFuncation());
        dataStreamSource.map(JSON::toJSONString).print();
//        dataStreamSource.print();
        DataStreamSource<EventLog> dataStreamSource1 = env.addSource(new MyRichSourceFunction());
        dataStreamSource1.map(JSON::toJSONString).print();
        env.execute();

    }
}
//我的Source组件,要生产的数据是EventLog对象
class MySourceFuncation implements SourceFunction<EventLog>{
   volatile boolean flag=true;

    @Override
    public void run(SourceContext<EventLog> sourceContext) throws Exception {
        EventLog eventLog = new EventLog();
        String[] events={"appLaunch","pageLoad","adShow","adClick","itemShare","itemCollect","putBack","wakeUp","appClose"};
        HashMap<String,String> eventInfoMap=new HashMap<>();
        while (flag){
            eventLog.setGuid(RandomUtils.nextLong(1,1000));
            //字母组成的随机数
            eventLog.setSessionId(RandomStringUtils.randomAlphanumeric(12).toLowerCase());
            eventLog.setTimeStramp(System.currentTimeMillis());
            eventLog.setEventId(events[RandomUtils.nextInt(0,events.length)]);
            eventInfoMap.put(RandomStringUtils.randomAlphanumeric(1),RandomStringUtils.randomAlphanumeric(2));
            eventLog.setEventInfo(eventInfoMap);
            sourceContext.collect(eventLog);
            eventInfoMap.clear();
            Thread.sleep(RandomUtils.nextInt(500,1500));

        }
    }

    @Override
    public void cancel() {
        flag=false;

    }
}
class MyRichSourceFunction extends RichSourceFunction<EventLog>{
    volatile boolean flag=true;
    //source组件初始化
    @Override
    public void open(Configuration parameters) throws Exception {
        RuntimeContext runtimeContext = getRuntimeContext();
        // 可以从运行时上下文中,取到本算子所属的 task 的task名
        String taskName = runtimeContext.getTaskName();
        // 可以从运行时上下文中,取到本算子所属的 subTask 的subTaskId
        int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();


    }
    //source组件生成数据的过程(核心工作逻辑)

    @Override
    public void run(SourceContext<EventLog> ctx) throws Exception {
        EventLog eventLog = new EventLog();
        String[] events={"appLaunch","pageLoad","adShow","adClick","itemShare","itemCollect","putBack","wakeUp","appClose"};
        HashMap<String,String> eventInfoMap = new HashMap<>();
        while (flag){
            eventLog.setGuid(RandomUtils.nextLong(1,1000));
            eventLog.setSessionId(RandomStringUtils.randomAlphanumeric(12).toLowerCase());
            eventLog.setTimeStramp(System.currentTimeMillis());
            eventLog.setEventId(events[RandomUtils.nextInt(0,events.length)]);
            eventInfoMap.put(RandomStringUtils.randomAlphanumeric(1),RandomStringUtils.randomAlphanumeric(2));
            eventLog.setEventInfo(eventInfoMap);
            ctx.collect(eventLog);
            eventInfoMap.clear();
            Thread.sleep(RandomUtils.nextInt(500,1500));


        }

    }
    // job取消调用的方法

    @Override
    public void cancel() {
        flag=false;

    }
    //组件关闭调用的方法

    @Override
    public void close() throws Exception {
        System.out.println("组件被关闭了.....");

    }
}

@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
@ToString
class EventLog{
    private long guid;
    private String sessionId;
    private String eventId;
    private long timeStramp;
    private Map<String,String> eventInfo;
}

image.png

以上两种都是单并行度,设置并行度不能大于1

下面是可以设置的.setParallelism(2)

// 自定义source
// *    可以实现   SourceFunction  或者 RichSourceFunction , 这两者都是非并行的source算子
// *    也可实现   ParallelSourceFunction  或者 RichParallelSourceFunction , 这两者都是可并行的source算子
// *
// * -- Rich的,都拥有 open() ,close() ,getRuntimeContext() 方法
// * -- Parallel的,都可多实例并行执行

package com.atguigu.day02;

import com.alibaba.fastjson.JSON;
import lombok.*;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import scala.util.parsing.json.JSONArray$;

import java.util.HashMap;
import java.util.Map;

public class _06_CustomSourceFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<EventLog> dataStreamSource = env.addSource(new MySourceFuncation());
//        dataStreamSource.map(JSON::toJSONString).print();
//        dataStreamSource.print();
        DataStreamSource<EventLog> dataStreamSource1 = env.addSource(new MyRichSourceFunction());
        dataStreamSource1.map(JSON::toJSONString).print();
        env.execute();

    }
}
//我的Source组件,要生产的数据是EventLog对象
class MySourceFuncation implements SourceFunction<EventLog>{
   volatile boolean flag=true;

    @Override
    public void run(SourceContext<EventLog> sourceContext) throws Exception {
        EventLog eventLog = new EventLog();
        String[] events={"appLaunch","pageLoad","adShow","adClick","itemShare","itemCollect","putBack","wakeUp","appClose"};
        HashMap<String,String> eventInfoMap=new HashMap<>();
        while (flag){
            eventLog.setGuid(RandomUtils.nextLong(1,1000));
            //字母组成的随机数
            eventLog.setSessionId(RandomStringUtils.randomAlphanumeric(12).toLowerCase());
            eventLog.setTimeStramp(System.currentTimeMillis());
            eventLog.setEventId(events[RandomUtils.nextInt(0,events.length)]);
            eventInfoMap.put(RandomStringUtils.randomAlphanumeric(1),RandomStringUtils.randomAlphanumeric(2));
            eventLog.setEventInfo(eventInfoMap);
            sourceContext.collect(eventLog);
            eventInfoMap.clear();
            Thread.sleep(RandomUtils.nextInt(500,1500));

        }
    }

    @Override
    public void cancel() {
        flag=false;

    }
}
class  MyParallelSourceFunction  implements ParallelSourceFunction<EventLog> {

    @Override
    public void run(SourceContext<EventLog> ctx) throws Exception {
        
    }

    @Override
    public void cancel() {

    }
}
class MyRichParallelSourceFuncation extends RichParallelSourceFunction<EventLog>{

    @Override
    public void run(SourceContext<EventLog> ctx) throws Exception {
        
    }

    @Override
    public void cancel() {

    }
}
class MyRichSourceFunction extends RichSourceFunction<EventLog>{
    volatile boolean flag=true;
    //source组件初始化
    @Override
    public void open(Configuration parameters) throws Exception {
        RuntimeContext runtimeContext = getRuntimeContext();
        // 可以从运行时上下文中,取到本算子所属的 task 的task名
        String taskName = runtimeContext.getTaskName();
        // 可以从运行时上下文中,取到本算子所属的 subTask 的subTaskId
        int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();


    }
    //source组件生成数据的过程(核心工作逻辑)

    @Override
    public void run(SourceContext<EventLog> ctx) throws Exception {
        EventLog eventLog = new EventLog();
        String[] events={"appLaunch","pageLoad","adShow","adClick","itemShare","itemCollect","putBack","wakeUp","appClose"};
        HashMap<String,String> eventInfoMap = new HashMap<>();
        while (flag){
            eventLog.setGuid(RandomUtils.nextLong(1,1000));
            eventLog.setSessionId(RandomStringUtils.randomAlphanumeric(12).toLowerCase());
            eventLog.setTimeStramp(System.currentTimeMillis());
            eventLog.setEventId(events[RandomUtils.nextInt(0,events.length)]);
            eventInfoMap.put(RandomStringUtils.randomAlphanumeric(1),RandomStringUtils.randomAlphanumeric(2));
            eventLog.setEventInfo(eventInfoMap);
            ctx.collect(eventLog);
            eventInfoMap.clear();
            Thread.sleep(RandomUtils.nextInt(500,1500));


        }

    }
    // job取消调用的方法

    @Override
    public void cancel() {
        flag=false;

    }
    //组件关闭调用的方法

    @Override
    public void close() throws Exception {
        System.out.println("组件被关闭了.....");

    }
}

@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
@ToString
class EventLog{
    private long guid;
    private String sessionId;
    private String eventId;
    private long timeStramp;
    private Map<String,String> eventInfo;
}

基础 transformation 算子

映射算子

map 映射(DataStream DataStream

map(new MapFunction )

MapFunction: x-> y

[1 条变 1 ]

public class MapDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//调用 env 的 fromElements 创建一个非并行的 DataStreamSource
DataStreamSource<String> words = env.fromElements(
"hadoop","spark","flink","hbase","flink","spark"
);
//在 map 方法中传入 MapFunction 实现类实例,重写 map 方法
DataStream<String> upperWords = words.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
//将每一个单词转成大写
return value.toUpperCase();
}
});
//调用 Sink 将数据打印在控制台
upperWords.print();
  env.execute("MapDemo");
}
}

flatMap 扁平化映射(DataStream DataStream

flatMap( new FlatMapFcuntion)

FlatMapFunction: x-> x1, x2,x3,x4 [1 条变多条,并展平]

DataStream<Tuple2<String, Integer>> wordAndOne = lines.flatMap(
new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
//将一行字符串按空格切分成一个字符串数组
String[] arr = line.split(" ");
for (String word : arr) {
//将单词转成小写放入到 Collector 中
out.collect(Tuple2.of(word.toLowerCase(), 1));
}
}
}
);

如果是调用 flatMap 方法时传入 Lambda 表达式,需要在调用 flatMap 方法后,在调用 returns 方法指

定返回的数据的类型。不然 Flink 无法自动推断出返回的数据类型,会出现异常

DataStream<Tuple2<String, Integer>> wAndOne = lines.flatMap(
(String line, Collector<Tuple2<String, Integer>> out) -> {
  Arrays.asList(line.split("\\W+")).forEach(word -> {
out.collect(Tuple2.of(word.toLowerCase(), 1));
});
}
).returns(Types.TUPLE(Types.STRING, Types.INT)); //使用 returns 指定返回数据的类型

project 投影(DataStream DataStream

该算子只能对 Tuple 类型数据使用,project 方法的功能类似 sql 中的"select 字段"

该方法只有 Java API 有,Scala API 没此方法

//使用 fromElements 生成数据,数据为 Tuple3 类型,分别代表姓名、性别、年龄
DataStreamSource<Tuple3<String, String, Integer>> users = env.fromElements(
Tuple3.of("佩奇", "女", 5), Tuple3.of("乔治", "男", 3)
);
//投影方法只可以用于类型为 Tuple 的 DataStream,返回的数据只要姓名和年龄
DataStream<Tuple> result = users.project(0, 2);

过滤算子

filter 过滤(DataStream DataStream

filter(new FilterFunction)

FilterFunction : x -> true/false

DataStreamSource<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
//过滤掉奇数,保留偶数
DataStream<Integer> even = numbers.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value % 2 == 0; //过滤掉返回 false 的数组
}
});

map算子演示

package com.atguigu.day02;
/*
*各类transformation算子的api演示
* */

import com.alibaba.fastjson.JSON;
import lombok.Data;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.Serializable;
import java.util.List;

public class _07_Transformation_Demos {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // {"uid":1,"name":"zs","friends":[{"fid":2,"name":"aa"},{"fid":3,"name":"bb"}]}
        DataStreamSource<String> streamSource = env.fromElements(
                "{\"uid\":1,\"name\":\"zs\",\"friends\":[{\"fid\":2,\"name\":\"aa\"},{\"fid\":3,\"name\":\"bb\"}]}",
                "{\"uid\":2,\"name\":\"ls\",\"friends\":[{\"fid\":1,\"name\":\"cc\"},{\"fid\":2,\"name\":\"aa\"}]}",
                "{\"uid\":3,\"name\":\"ww\",\"friends\":[{\"fid\":2,\"name\":\"aa\"}]}",
                "{\"uid\":4,\"name\":\"zl\",\"friends\":[{\"fid\":3,\"name\":\"bb\"}]}",
                "{\"uid\":5,\"name\":\"tq\",\"friends\":[{\"fid\":2,\"name\":\"aa\"},{\"fid\":3,\"name\":\"bb\"}]}"
        );
        // 把每条json数据,转成javabean数据
        SingleOutputStreamOperator<UserInfo> beanStream = streamSource.map(json -> JSON.parseObject(json, UserInfo.class));
        beanStream.print();
        env.execute();
    }
}
@Data
class UserInfo implements Serializable {
    private  int uid;
    private String name;
    private List<FriendInfo> friends;

}
@Data
class FriendInfo implements Serializable{
    private int fid;
    private String name;
}

image.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
7月前
|
Java 流计算
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
261 3
|
7月前
|
流计算 Windows
Flink窗口与状态编程开发(一)
Flink窗口与状态编程开发(一)
|
Java Linux API
flink入门-流处理
flink入门-流处理
164 0
|
存储 Java Linux
10分钟入门Flink--安装
本文介绍Flink的安装步骤,主要是Flink的独立部署模式,它不依赖其他平台。文中内容分为4块:前置准备、Flink本地模式搭建、Flink Standalone搭建、Flink Standalong HA搭建。
10分钟入门Flink--安装
|
分布式计算 Java API
Flink教程(04)- Flink入门案例
Flink教程(04)- Flink入门案例
172 0
|
数据处理 Apache 流计算
实时计算引擎 Flink:从入门到深入理解
本篇详细介绍了Apache Flink实时计算引擎的基本概念和核心功能。从入门到深入,逐步介绍了Flink的数据源与接收、数据转换与计算、窗口操作以及状态管理等方面的内容,并附带代码示例进行实际操作演示。通过阅读本文,读者可以建立起对Flink实时计算引擎的全面理解,为实际项目中的实时数据处理提供了有力的指导和实践基础。
2087 2
|
7月前
|
SQL 关系型数据库 Apache
Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门
Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门
1403 3
|
7月前
|
消息中间件 存储 NoSQL
Flink几道经典编程场景
Flink几道经典编程场景
|
存储 缓存 分布式计算
Flink教程(02)- Flink入门(下)
Flink教程(02)- Flink入门(下)
123 0
|
SQL 消息中间件 API
Flink教程(02)- Flink入门(上)
Flink教程(02)- Flink入门(上)
212 0
下一篇
DataWorks