请问一下,有没有人用过zeppelin的,都是怎么支持flink 1.15.0。。

请问一下,有没有人用过zeppelin的,都是怎么支持flink 1.15.0。。

展开
收起
游客3oewgrzrf6o5c 2022-08-17 14:35:33 557 分享 版权
1 条回答
写回答
取消 提交回答
  • 全栈JAVA领域创作者

    Zeppelin是一个基于Web的交互式计算环境,支持多种语言,包括Python、R、SQL、Java等。它可以在浏览器中运行,提供了类似于命令行的交互式界面,并支持可视化、自动补全、调试等功能。 Zeppelin支持使用Flink作为数据处理引擎,可以通过在zeppelin notebook中直接调用Flink SQL语句来实现。以下是一个使用Zeppelin支持Flink 1.15.0的示例:

    scala Copy code from pyspark.sql import SparkSession from pyspark.sql.functions import from_json, col

    创建SparkSession对象

    spark = SparkSession.builder \n .appName("Flink SQL with Zeppelin") \n .enableHiveSupport() \n .getOrCreate()

    从json文件读取数据

    json_df = spark \n .read \n .option("header", "true") \n .json("input.json") \n .createOrReplaceTempView("json_table")

    将数据转换为Flink表格

    flink_table = spark \n .sql("FROM JsonTableSource") \n .option("header", "true") \n .option("path", "jars/flink-1.15.0-bin-scala_2.11/lib/flink-streaming-java_2.11-1.15.0.jar") \n .getOrCreateExternalTable("flink_table")

    将数据写入Flink表格

    flink_table \n .writeStream \n .outputMode("append") \n .format("csv") \n .option("path", "jars/flink-1.15.0-bin-scala_2.11/lib/flink-streaming-java_2.11-1.15.0.jar") \n .option("checkpointLocation", "chkpt/") \n .saveAsTable("output_table") 上述示例中,我们首先创建了一个SparkSession对象,然后从一个JSON文件中读取数据,将其转换为Flink表格,并将数据写入到一个Flink表格中。其中,from_json和col函数是针对Python语言的支持,可以在zeppelin notebook中使用。 需要注意的是,Zeppelin支持的Flink版本需要与Zeppelin的Flink版本相匹配。如果使用的Flink版本不同,需要先安装相应版本的Flink和Zeppelin,并将Zeppelin的配置文件中的相关参数进行更改。

    2023-06-19 19:04:53
    赞同 展开评论

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

收录在圈子:
实时计算 Flink 版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于 Apache Flink 构建的企业级、高性能实时大数据处理系统,由 Apache Flink 创始团队官方出品,拥有全球统一商业化品牌,完全兼容开源 Flink API,提供丰富的企业级增值功能。
还有其他疑问?
咨询AI助理