大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)

接上篇:https://developer.aliyun.com/article/1622681?spm=a2c6h.13148508.setting.17.27ab4f0ek8nPMY

运行测试

结果数据

查看 word-count/word-count-result.csv 打开即可看到以下内容:

Stateful 1
any 1
common 1
computations 2
on 1
setup 1
state 1
streams. 1
unbounded 1
& 3
Data 2
DataStream 1
High-availability 1
for 1
perform 1
run 1
to 1
Event-time 1
Flexible 1
Sophisticated 1
framework 1
is 1
scale. 1
Exactly-once 1
ProcessFunction 1
Stream 1
a 1
been 1
handling 1
in 1
late 1
processing 2
Batch 1
DataSet 1
at 2
bounded 1
consistency 1
deployment 1
distributed 1
engine 1
has 1
API 2
Apache 1
Flink 2
SQL 1
Streams 1
all 1
designed 1
over 2
Computations 1
Savepoints 1
and 3
data 2
environments, 1
in-memory 1
speed 1
stateful 1
(Time 1
Correctness 1
State) 1
cluster 1
guarantees 1

单词统计(流数据)

需求说明

Socket模拟实时发送单词,使用Flink实时接收数据,对指定时间窗口内(如5秒)的数据进行聚合统计,每隔1秒汇总计算一次,并且把时间窗口内计算结果打印出来。

编写代码

Server部分

编写一个Socket服务,提供一定的数据流

package icu.wzk;


import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;

public class WordCountServer {

    public static void main(String[] args) throws IOException, InterruptedException {
        String ip = "localhost";
        int port = 9999;
        Random random = new Random();
        ServerSocket serverSocket = new ServerSocket();
        InetSocketAddress address = new InetSocketAddress(ip, port);
        serverSocket.bind(address);
        Socket socket = serverSocket.accept();
        OutputStream outputStream = socket.getOutputStream();
        PrintWriter writer = new PrintWriter(outputStream, true);
        for (int i = 0; i < 1000; i ++) {
            int number = random.nextInt(100);
            System.out.println(number);
            writer.println(number);
            Thread.sleep((random.nextInt(900) + 100));
        }
        socket.close();
        serverSocket.close();
    }

}

Flink部分

连接到上述的Server部分

package icu.wzk;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;


public class WordCount2 {

    public static void main(String[] args) throws Exception {
        String ip = "localhost";
        int port = 9999;

        // 获取 Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 获取 Socket 输入数据
        DataStreamSource<String> textStream = env.socketTextStream(ip, port, "\n");
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordCount = textStream
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] splits = value.split("\\s");
                        for (String word : splits) {
                            out.collect(new Tuple2<>(word, 1));
                        }
                    }
                });
        SingleOutputStreamOperator<Tuple2<String, Integer>> word = wordCount
                .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
                    @Override
                    public Object getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                })
                .timeWindow(Time.seconds(5), Time.seconds(1))
                .sum(1);

        // 输出并运行
        word.print();
        env.execute("Word Count");
    }

}

观察结果

Server部分

35
18
84
72
24
51
15
13
65
98
55
68
22
84
17

Flink部分

3> (35,1)
4> (18,1)
3> (35,1)
5> (84,1)
4> (18,1)
6> (72,1)
3> (35,1)
5> (84,1)
5> (24,1)
3> (35,1)
6> (72,1)
4> (18,1)
7> (51,1)
5> (24,1)
5> (84,1)
4> (15,1)
6> (72,1)
7> (51,1)
3> (35,1)
4> (15,1)
4> (18,1)

运行结果过程截图如下所示:

过程总结

  • 获得一个执行环境
  • 加载、创建 初始化环境
  • 指定数据操作的算子
  • 指定结果数据存放位置
  • 调用Execute触发执行程序

注意:Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正的触发执行程序。

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
2月前
|
机器学习/深度学习 传感器 分布式计算
数据才是真救命的:聊聊如何用大数据提升灾难预警的精准度
数据才是真救命的:聊聊如何用大数据提升灾难预警的精准度
184 14
|
2月前
|
传感器 人工智能 监控
数据下田,庄稼不“瞎种”——聊聊大数据如何帮农业提效
数据下田,庄稼不“瞎种”——聊聊大数据如何帮农业提效
139 14
|
1月前
|
传感器 人工智能 监控
拔俗多模态跨尺度大数据AI分析平台:让复杂数据“开口说话”的智能引擎
在数字化时代,多模态跨尺度大数据AI分析平台应运而生,打破数据孤岛,融合图像、文本、视频等多源信息,贯通微观与宏观尺度,实现智能诊断、预测与决策,广泛应用于医疗、制造、金融等领域,推动AI从“看懂”到“会思考”的跃迁。
|
2月前
|
机器学习/深度学习 传感器 监控
吃得安心靠数据?聊聊用大数据盯紧咱们的餐桌安全
吃得安心靠数据?聊聊用大数据盯紧咱们的餐桌安全
105 1
|
2月前
|
数据采集 自动驾驶 机器人
数据喂得好,机器人才能学得快:大数据对智能机器人训练的真正影响
数据喂得好,机器人才能学得快:大数据对智能机器人训练的真正影响
208 1
|
3月前
|
机器学习/深度学习 监控 大数据
数据当“安全带”:金融市场如何用大数据玩转风险控制?
数据当“安全带”:金融市场如何用大数据玩转风险控制?
135 10
|
3月前
|
机器学习/深度学习 自然语言处理 监控
大数据如何影响新兴市场投资决策?——数据才是真正的风向标
大数据如何影响新兴市场投资决策?——数据才是真正的风向标
92 3
|
4月前
|
数据采集 分布式计算 DataWorks
ODPS在某公共数据项目上的实践
本项目基于公共数据定义及ODPS与DataWorks技术,构建一体化智能化数据平台,涵盖数据目录、归集、治理、共享与开放六大目标。通过十大子系统实现全流程管理,强化数据安全与流通,提升业务效率与决策能力,助力数字化改革。
161 4
|
3月前
|
机器学习/深度学习 运维 监控
运维不怕事多,就怕没数据——用大数据喂饱你的运维策略
运维不怕事多,就怕没数据——用大数据喂饱你的运维策略
147 0
|
4月前
|
分布式计算 DataWorks 数据处理
在数据浪潮中前行:记录一次我与ODPS的实践、思考与展望
本文详细介绍了在 AI 时代背景下,如何利用阿里云 ODPS 平台(尤其是 MaxCompute)进行分布式多模态数据处理的实践过程。内容涵盖技术架构解析、完整操作流程、实际部署步骤以及未来发展方向,同时结合 CSDN 博文深入探讨了多模态数据处理的技术挑战与创新路径,为企业提供高效、低成本的大规模数据处理方案。
296 3