Apache Flink1.12开发环境配置

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

Flink1.12官方文档:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/

(1)基于IDEA配置开发环境


基本环境:


IntelliJ IDEA 2021

apache-maven-3.5.4

jdk1.8.0_271

scala-2.11.12

配置pom.xml文件:

https://flink.apache.org/zh/downloads.html#section-9

<properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <maven.compiler.source>1.8</maven.compiler.source>
  <maven.compiler.target>1.8</maven.compiler.target>
  <flink.version>1.12.1</flink.version>
</properties>
<dependencies>
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.11</version>
    <scope>test</scope>
  </dependency>
  <!--java-->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <!--scala-->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>
</dependencies>

(2)基于Java语言的WordCount程序开发


package com.aikfk.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/5 3:07 下午
 */
public class WordCountJava {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String,Integer>> dataStream = env.socketTextStream("bigdata-pro-m07",9999)
                .flatMap(new Splitter())
                .keyBy(value -> value.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(1);
        dataStream.print();
        env.execute("Window WordCount");
    }
    public static class Splitter implements FlatMapFunction<String,Tuple2<String,Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word : sentence.split(" ")){
                out.collect(new Tuple2<String,Integer>(word,1));
            }
        }
    }
}

运行结果

3> (java,2)
2> (hive,2)
3> (java,1)
2> (hive,1)
1> (spark,2)

(3)基于Scala语言的WordCount程序开发


package com.aikfk.flink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object WindowWordCount {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("bigdata-pro-m07", 9999)
    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .sum(1)
    counts.print()
    env.execute("Window Stream WordCount")
  }
}

运行结果

3> (java,2)
2> (hive,2)
3> (java,1)
2> (hive,1)
1> (spark,2)



(4)程序打包并发布测试


IDEA中打Jar包时要特别注意这里,不能用默认的地址,需要改一下MANIFEST. MF的地址到src目录

20.png

启动集群服务:

bin/start-cluster.sh 

运行jar包:

bin/flink run -jar /opt/jars/WordCountJava.jar com.aikfk.flink.WordCountJava

Job has been submitted with JobID eff5617c683e2ea65bd498b4c4d062a8

运行NC服务:

nc -lk 9999

查看结果:

cat flink-root-taskexecutor-0-bigdata-pro-m07.out

(def,1)
(fdfr,1)
(sdijo,1)
(re,1)
(sedwvf,1)
(f,1)
(wedfwb,1)

查看WEB监控:21.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
363 33
The Past, Present and Future of Apache Flink
|
3月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
979 13
Apache Flink 2.0-preview released
|
3月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
155 3
|
2月前
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
183 9
|
3月前
|
存储 分布式计算 druid
大数据-152 Apache Druid 集群模式 配置启动【下篇】 超详细!(一)
大数据-152 Apache Druid 集群模式 配置启动【下篇】 超详细!(一)
60 1
大数据-152 Apache Druid 集群模式 配置启动【下篇】 超详细!(一)
|
3月前
|
缓存 前端开发 应用服务中间件
CORS跨域+Nginx配置、Apache配置
CORS跨域+Nginx配置、Apache配置
320 7
|
3月前
|
消息中间件 分布式计算 druid
大数据-152 Apache Druid 集群模式 配置启动【下篇】 超详细!(二)
大数据-152 Apache Druid 集群模式 配置启动【下篇】 超详细!(二)
57 2
|
3月前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
302 0
|
5月前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
57 1
|
4月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。

推荐镜像

更多
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等