Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!

简介: Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!

代码仓库

会同步代码到 GitHub

https://github.com/turbo-duck/flink-demo

当前章节

继续上一节的内容:https://blog.csdn.net/w776341482/article/details/139875037


上一节中,我们需要使用 nc 或者 telnet 等工具来模拟 Socket 流。这节我们写一个 ServerSocket 来模拟这些 操作,让流自动的写入不用我们手动去操作了。

POM.xml

与上一节一致,不需要修改

编写代码

还是和上一节一样的 Socket 流,这里略去其他的代码

DataStreamSource<String> textStream = streamExecutionEnvironment.socketTextStream(ip, port, "\n");

FlinkServer

继承 Thread 启动一个线程来进行Flink的服务

package icu.wzk.demo03;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class FlinkServer extends Thread {

    @Override
    public void run() {
        String ip = "0.0.0.0";
        int port = 9999;
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> textStream = streamExecutionEnvironment.socketTextStream(ip, port, "\n");
        SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = textStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {
                String[] splits = s.split("\\s");
                for (String word : splits) {
                    collector.collect(Tuple2.of(word, 1L));
                }
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Long>> word = tuple2SingleOutputStreamOperator
                .keyBy(new KeySelector<Tuple2<String, Long>, Object>() {
                    @Override
                    public Object getKey(Tuple2<String, Long> stringLongTuple2) throws Exception {
                        return stringLongTuple2.f0;
                    }
                })
                .window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
                .sum(1);
        System.out.println("wait word print()");
        word.print();
        try {
            streamExecutionEnvironment.execute("stream!");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

NumRandom

使用 ServerSocket 实现一个持续的流输出

package icu.wzk.demo03;

import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;

public class RandomNumClient extends Thread {

    @Override
    public void run() {
        String ip = "0.0.0.0";
        int port = 9999;
        try {
            ServerSocket serverSocket = new ServerSocket();
            InetSocketAddress address = new InetSocketAddress(ip, port);
            serverSocket.bind(address);
            Socket socket = serverSocket.accept();
            OutputStream output = socket.getOutputStream();
            PrintWriter writer = new PrintWriter(output, true);
            Random random = new Random();
            for (int i = 0; i < 500; i ++) {
                int randomNumber = random.nextInt(10) + 1;
                writer.println(randomNumber);
                System.out.println("ServerSocket Send To Flink: " + randomNumber);
                Thread.sleep(200);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

}

StartApp

将上述的两个类组装起来

package icu.wzk.demo03;


public class StartApp {

    public static void main(String[] args) throws Exception {
        RandomNumClient randomNumClient = new RandomNumClient();
        FlinkServer flinkServer = new FlinkServer();
        flinkServer.start();
        randomNumClient.start();
    }

}
相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
8月前
|
Java API 数据处理
Java新特性:使用Stream API重构你的数据处理
Java新特性:使用Stream API重构你的数据处理
|
8月前
|
Java 大数据 API
Java Stream API:现代集合处理与函数式编程
Java Stream API:现代集合处理与函数式编程
375 100
|
8月前
|
Java API 数据处理
Java Stream API:现代集合处理新方式
Java Stream API:现代集合处理新方式
373 101
|
8月前
|
并行计算 Java 大数据
Java Stream API:现代数据处理之道
Java Stream API:现代数据处理之道
427 101
|
9月前
|
存储 Java API
Java Stream API:现代数据处理之道
Java Stream API:现代数据处理之道
451 188
|
9月前
|
存储 Java API
Java Stream API:现代数据处理之道
Java Stream API:现代数据处理之道
355 92
|
8月前
|
机器学习/深度学习 人工智能 测试技术
EdgeMark:嵌入式人工智能工具的自动化与基准测试系统——论文阅读
EdgeMark是一个面向嵌入式AI的自动化部署与基准测试系统,支持TensorFlow Lite Micro、Edge Impulse等主流工具,通过模块化架构实现模型生成、优化、转换与部署全流程自动化,并提供跨平台性能对比,助力开发者在资源受限设备上高效选择与部署AI模型。
682 9
EdgeMark:嵌入式人工智能工具的自动化与基准测试系统——论文阅读
|
7月前
|
Java Unix Go
【Java】(8)Stream流、文件File相关操作,IO的含义与运用
Java 为 I/O 提供了强大的而灵活的支持,使其更广泛地应用到文件传输和网络编程中。!但本节讲述最基本的和流与 I/O 相关的功能。我们将通过一个个例子来学习这些功能。
294 1
|
8月前
|
Java 测试技术 API
自动化测试工具集成及实践
自动化测试用例的覆盖度及关键点最佳实践、自动化测试工具、集成方法、自动化脚本编写等(兼容多语言(Java、Python、Go、C++、C#等)、多框架(Spring、React、Vue等))
665 6
|
8月前
|
存储 数据可视化 Java
Java Stream API 的强大功能
Java Stream API 是 Java 8 引入的重要特性,它改变了集合数据的处理方式。通过声明式语法,开发者可以更简洁地进行过滤、映射、聚合等操作。Stream API 支持惰性求值和并行处理,提升了代码效率和可读性,是现代 Java 开发不可或缺的工具。
162 0
Java Stream API 的强大功能