一、Map
1.1 介绍
1.DataStream->DataStream 数据集转换。
2.数据集合中的元素一一映射的关系。
1.2 MapFunction
public class Map01 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // source DataStream<Integer> nums = env.fromElements(1, 2, 3, 4, 5); // 实现一:map 方法做映射,输入输出一直的2倍 SingleOutputStreamOperator<Integer> result1 = nums.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return value * 2; } }); // 实现二:lambada 表达式的应用 SingleOutputStreamOperator<Integer> result2 = nums.map(i -> i * 2); // sink result1.print(); result2.print(); env.execute(); } }
结果如下:
1.3 RichMapFunction
1.open()方法 在构造方法之后,map方法之前,只执行一次,初始化方法 使用数据库
2.Configuration为全局的配置
3.close()方法 销毁之前执行一次,通常为资源额的释放
程序如下:
// RichMapFunction nums.map(new RichMapFunction<Integer, Integer>() { // open 在构造方法之后,map方法之前,只执行一次,初始化方法 使用数据库 // Configuration为全局的配置 @Override public void open(Configuration parameters) throws Exception { super.open(parameters); } @Override public Integer map(Integer value) throws Exception { return value * 10; } // close 销毁之前执行一次,通常为资源额的释放 @Override public void close() throws Exception { super.close(); } });
二、FlatMap
2.1 介绍
输入一个元素,会被切分成多个元素。一对多。
2.2 FlatMapFunction
public class FlatMap01 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> lines = env.fromElements("GW001001 GW002002 GW003003", "GW001001 GW002002 GW003003", "GW001001 GW002002 GW003003"); SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String line, Collector<String> out) throws Exception { //实现一:jdk8的流式处理 lambada表达式【推荐】 //Arrays.stream(line.split(" ")).forEach(out::collect); //实现二: //Arrays.asList(line.split(" ")).forEach(w -> out.collect(w)); //实现三:最原始的方式 String[] words = line.split(" "); for (String word : words) { out.collect(word); } } }); words.print(); env.execute(); } }
结果如下:
2.3 RichFlatMapFunction
雷同方法 open() close()方法。
三、Filter
3.1 介绍
1.ture是留下,false过滤
2.实现对输入的数据进行逻辑判断,判断是否是奇数?
3.2 FilterFunction
public class Filter01 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Integer> nums = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9); //实现一:ture是留下,false时过滤 SingleOutputStreamOperator<Integer> filter1 = nums.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value % 2 != 0; } }); //实现二:lambada表达式:filter2 //SingleOutputStreamOperator<Integer> filter2 = nums.filter(i -> i >= 5); SingleOutputStreamOperator<Integer> filter2 = nums.filter(i -> { //换行要有return return i >= 5; }); filter1.print(); filter2.print(); env.execute(); } }
计算结果:
四、KeyBy
实时计算的算子
4.1 lambda实现
1.实现输入一个,返回一个,使用lambada表达式 代替new Function,利用虚拟机开设一个socket端口号,实现实时聚合计算。
2.虚拟机centos中:nc -lk 8888
3.元组也是一个特殊的集合,角标 0 开始 最大Tuple25
4.代码实现:
public class KeyBy01 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888); //使用lambada表达式 代替new Function //输入一个返回一个 SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(w -> Tuple2.of(w, 1)) .returns(Types.TUPLE(Types.STRING, Types.INT)); //元组也是一个特殊的集合,角标 0 开始 最大Tuple25 KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0); //聚合 keyed.print(); env.execute("KeyBy01"); } }
运行结果:
4.2 KeyBy自定义实体类
1.实体类WordAndCount
public class WordAndCount { private String word; private Long counts; }
2.keyby sum
public class KeyBy02 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> lines = env.socketTextStream("192.168.52.200", 8808); //输入一个返回一个 SingleOutputStreamOperator<WordAndCount> wordAndOne = lines.map(new MapFunction<String, WordAndCount>() { @Override public WordAndCount map(String value) throws Exception { return Turbine.of(value,1L); } }); //根据实体类的字段进行聚合 KeyedStream<WordAndCount, Tuple> keyed = wordAndOne.keyBy("word"); //聚合 SingleOutputStreamOperator<WordAndCount> sumed = keyed.sum("counts"); keyed.print(); sumed.print(); env.execute(); } }
运行结果:
4.3 keyBy多字段进行分组
代码:
public class KeyBy03 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> lines = env.socketTextStream("192.168.52.200", 8828); //山东,烟台,2000 //山东,烟台,2000 //山东,烟台,2000 SingleOutputStreamOperator<Tuple3<String, String, Double>> provinceCityMoney = lines.map(new MapFunction<String, Tuple3<String, String, Double>>() { @Override public Tuple3<String, String, Double> map(String line) throws Exception { //切分 String[] fields = line.split(","); String province = fields[0]; String city = fields[1]; double money = Double.parseDouble(fields[2]); return Tuple3.of(province, city, money); } }); //按照省份,城市分组 多个字段进行分组,最后一个字段进行聚合 /** * 如果是自己定义的bean实体类,可以进行将字段写进去 */ SingleOutputStreamOperator<Tuple3<String, String, Double>> summed = provinceCityMoney.keyBy(0, 1).sum(2); summed.print(); env.execute(); } }
运行结果:
五、Reduce
聚合,sum只可以加法,而reduce可以乘法,可以自定义算子。
public class Reduce01 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> lines = env.socketTextStream("192.168.52.200", 8828); //使用lambada表达式 代替new Function //输入一个返回一个 SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(w -> Tuple2.of(w, 1)) .returns(Types.TUPLE(Types.STRING, Types.INT)); //元组也是一个特殊的集合,角标 0 开始 最大Tuple25 KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0); //聚合 SingleOutputStreamOperator<Tuple2<String, Integer>> reduced = keyed.reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { String key = value1.f0; Integer count1 = value1.f1; Integer count2 = value2.f1; Integer counts = count1 + count2; return Tuple2.of(key, counts); } }); reduced.print(); env.execute(); } }
运行结果:
六、Max
例如:输入
spark,10
spark,20
hadoop,10
和历史数据比较,求最大次数的,最大的留下,最小的丢弃
public class Max01 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String line) throws Exception { String[] fields = line.split(","); String words = fields[0]; int num = Integer.parseInt(fields[1]); return Tuple2.of(words, num); } }); //按照单词进行分组 KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0); //求最大次数的,最大的留下,最小的丢弃 SingleOutputStreamOperator<Tuple2<String, Integer>> max = keyed.max(1); max.print(); env.execute(); } }
运行结果:
七、Sink
7.1 print()
打印到控制台也是sink.
7.2 CSV
public class AddSink01 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("192.168.52.200", 8888); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String line) throws Exception { String[] fields = line.split(","); String words = fields[0]; int num = Integer.parseInt(fields[1]); return Tuple2.of(words, num); } }); //按照单词进行分组 KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0); SingleOutputStreamOperator<Tuple2<String, Integer>> max = keyed.max(1); /** * 自定义sink,比如 写入数据库,磁盘等等 * 不需要有返回就可以 */ max.addSink(new SinkFunction<Tuple2<String, Integer>>() { @Override public void invoke(Tuple2<String, Integer> value, Context context) throws Exception { System.out.println(value); } }); /** * 写入磁盘 * 如果是写入scv文件 必须时tuple格式 */ max.writeAsCsv("F:\\out222", FileSystem.WriteMode.OVERWRITE); max.print(); env.execute(); } }
7.3 RedisSink
public class MyRedisSink extends RichSinkFunction<Turbine> { //初始化redis连接 private transient Jedis jedis; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ParameterTool params = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); String host = params.getRequired("redis.host"); //String password = params.getRequired("redis.pwd"); int db = params.getInt("redis.db", 0); jedis = new Jedis(host, 6379, 5000); //jedis.auth(password); jedis.select(db); } @Override public void invoke(Turbine value, Context context) throws Exception { if (!jedis.isConnected()) { jedis.connect(); } //写入redis jedis.hset(value.word, value.province, String.valueOf(value.counts)); } @Override public void close() throws Exception { super.close(); jedis.close(); } }
7.4 MySqlSink
public class MySqlSink extends RichSinkFunction<GW200001> { //最好连接不参与序列化 private transient Connection conn = null; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //创建mysql连接 conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/turbine?characterEncoding=UTF-8", "root", "123456"); System.out.println("拿到连接了"); } @Override public void invoke(GW200001 gw200001, Context context) throws Exception { //更新,插入,统计业务 PreparedStatement pstm = null; try { pstm = conn.prepareStatement("insert into gw200001(wt_number, wt_date_time) values(?,?)"); pstm.setString(1, gw200001.wt_number); pstm.setString(2, gw200001.wt_date_time); //执行sql executeUpdate() executeQuery() System.out.println("执行sql"); pstm.execute(); } finally { if (pstm != null) { pstm.close(); System.out.println("正常关闭"); } } } @Override public void close() throws Exception { super.close(); conn.close(); System.out.println("正常关闭了!"); } }