Window环境下安装大数据框架

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 虽然公司配了mac,不过家里的电脑还是上学时候的拯救者,学习大数据框架的时候也基本是直接上虚拟机,每次打开电脑,打开虚拟机,再打开xshell,真的是烦,正好最近重装了一下电脑,于是就想着结合scoop+docker优化一下之前的虚拟机+xshell的路线。

SCOOP的安装

作为window版本的包管理工具,在github上scoop有着完整的安装方式,首先打开自己的powershell,然后输入iwr -useb get.scoop.sh | iex,按照教程的说法,此刻scoop就会安装在你c盘的用户文件夹下的当前用户名的目录下。

可惜的是我没有,他报错了:raw.githubusercontent.com   #comments. put the address here,网上说是dns污染导致,只需要在https://www.ipaddress.com/键入raw.githubusercontent.com就可以获得ip地址(绕开寻址,直接跳转),然后打开c盘的hosts文件,输入ip地址 raw.githubusercontent.com即可。

修改hosts文件

hosts文件在C:\Windows\System32\drivers\etc,修改hosts文件的时候会提示无法保存,用管理员打开也依旧不可以。

需要找到hosts文件,然后打开属性再打开安全,选择编辑,然后点击添加,点高级,点立即查找,再选中我们的用户名,点确定后,权限选择完全控制,确定之后再次打开就可以正常修改保存。

禁止执行脚本

然后我在管理员打开的powershell中继续执行scoop的安装,结果报错PowerShell requires an execution policy in [Unrestricted, RemoteSigned, ByPa,搜索说是执行策略的问题,可以通过set-executionpolicy RemoteSigned来调整执行策略,然后用get-executionpolicy来查看自己当前的执行策略。

结果依旧不可以,结果是不建议管理员打开的powershell安装软件,重新打开一个新的powershell即可安装。

SCOOP安装软件

下面这幅图是我目前安装的软件包

其他没有出问题的,这里就不说了,简单说一下安装的方法和我安装中出现问题的几款软件。

安装很简单,想搜索可以用scoop search 包名搜索,用scoop install 包名安装,用scoop uninstall 包名卸载,scoop list查看当前安装的包,类似yum或者brew的操作。

Java乱码

当我安装完openjdk11后,输入java,发现出现乱码,试图通过调整时间,或者是转换文字格式都无法解决,虽然说是很像中文乱码的问题,不过我尝试过其他中文显示,都是正常的,记得用龙蜥的时候用man也发现类似的问题,我寻思能用就行。

MySQL的启动

安装完mysql,我看给的密码为blank,然后打开新的powershell,输入mysql -u root -p报错。

原因很简单,需要先启动mysql服务,mysqld --standalone,可以通过net stop mysql来停止mysql服务,这样mysql就可以正常使用了。

Python的安装

scoop安装python的时候,报错了,报错信息是ERROR 'dark' isn't installed correctly.不过影响不大,工具这东西是越简单越好,只需要在没有安装python的电脑终端输入python,就可以自动跳转到微软商店的开发者工具,一键安装即可。

Docker的安装

scoop安装好后,现在来安装docker,作为一款容器,docker可以帮助我们快速搭建一个hadoop集群,也可以搭建以flink集群,安装和使用都比打开虚拟机再上传代码等快速而方便。

安装也很简单,只需要在https://docs.docker.com/desktop/install/windows-install/点击Docker Desktop for Windows即可,不过和mac以及linux不同,windows要求Docker桌面端需要四个条件,首先是win10以上版本,然后需要64位处理器,4G的内存,还需要打开虚拟化。

打开Hyper-V

所谓虚拟化,也就是在windows功能(可以搜索启用或关闭windows功能)中选中Hyper-V,只不过windows家庭版是没有该选项的。

需要我们手动建一个文件,然后输入内容如下:

pushd"%~dp0"dir/b%SystemRoot%\servicing\Packages\*Hyper-V*.mum>hyper-v.txtfor/f%%iin('findstr /i . hyper-v.txt 2^>nul')dodism/online/norestart/add-package:"%SystemRoot%\servicing\Packages\%%i"delhyper-v.txtDism/online/enable-feature/featurename:Microsoft-Hyper-V-All/LimitAccess/ALL

需要注意粘贴格式,我直接用vim建个文件粘贴的,然后将文件名改为cmd,用管理员运行文件,会提示是否安装,输入Y安装重启电脑,Hyper-V就会出现并勾选,此时docker就可以正常运行了。

Flink+Kafka示例

docker的具体命令写法,这里就不写了,基本了解docker run,docker exec,dokcer images,docker ps,docker-compose应该也差不多够用。

Docker部分

这里我们首先在一个空文件夹下创建docker-compose.yml

version: "3.7"# zookeeper: 172.26.0.10# kafka: 172.26.0.11# flink: - job manager 172.26.0.20#        - task manager(s) random assigment in 172.26.0.0/24services:  ZooKeeper:    image: ubuntu/zookeeper
    container_name: ZooKeeper
    hostname: ZooKeeper
    volumes:      - ./data:/data
    ports:      - "2181:2181"    networks:      kafka-flink-cluster-network:        ipv4_address: 172.26.0.10  Kafka:    image: wurstmeister/kafka
    ports:      - "9092:9092"    depends_on:      - ZooKeeper
    environment:      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1      KAFKA_MESSAGE_MAX_BYTES: 52428800  # 50M      KAFKA_CREATE_TOPICS: "topic_one:1:1,topic_two:3:1"      KAFKA_ZOOKEEPER_CONNECT: ZooKeeper:2181    volumes:      - ./kafka-logs:/kafka
    networks:      kafka-flink-cluster-network:        ipv4_address: 172.26.0.11  FlinkJobManager:    image: flink:1.14.4-scala_2.12-java8
    container_name: FlinkJobManager
    hostname: FlinkJobManager
    volumes:      - ./flink-jobmanager:/data
    expose:      - "6123"    links:      -  Kafka
    networks:      kafka-flink-cluster-network:        ipv4_address: 172.26.0.20    ports:      - "8081:8081"    command: jobmanager
    environment:      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: FlinkJobManager
  FlinkTaskManager:    image: flink:1.14.4-scala_2.12-java8
#container_name: FlinkTaskManager#hostname: FlinkTaskManager    expose:      - "6121"      - "6122"    depends_on:      - FlinkJobManager
    command: taskmanager
    scale: 2    links:      - "FlinkJobManager:jobmanager"    environment:      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: FlinkJobManager
        taskmanager.numberOfTaskSlots: 2    networks:      - kafka-flink-cluster-network
networks:  kafka-flink-cluster-network:# external: true    driver: bridge
    ipam:      config:        - subnet: 172.26.0.0/24

然后在当前路径的powershell中输入docker-compose up -d,等到安装结束,在docker的可视化界面就可以看到五个启动的容器,或者输入docker ps也可以看到对应的容器,需要留意的是docker容器启动后再添加端口会很麻烦,建议提前写好,也就是文件expose下添加映射。

启动完容器,我们可以输入docker exec -it   docker-flink-zookerper-kafaka-Kafka-1 bash进入kafka中,然后cd opt/kafka/bin中找到消费者和生产者对应的shell。

Scala部分

在idea中新建一个项目,并且在pom中配置上scala和flink依赖,这里需要留意版本问题,此处flink我选用的1.11,而scala选用的是2.12。

<?xmlversion="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkStudy01</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.11.0</flink.version><scala.version>2.12</scala.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId><version>${flink.version}</version><!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.version}</artifactId><version>${flink.version}</version><!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.version}</artifactId><version>${flink.version}</version><!--            <scope>provided</scope>--></dependency><!--  <dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.version}</artifactId><version>${flink.version}</version></dependency></dependencies></project>

然后写一个测试脚本,从kafka的生产者中读取数据,走过flink,再输出到kafka的消费者中。

packagecom.daishu//import org.apache.flink.runtime.io.network.buffer.Buffer.DataTypeimportorg.apache.flink.streaming.api._importorg.apache.flink.streaming.api.scala._importorg.apache.flink.table.api._importorg.apache.flink.table.api.bridge.scala._importorg.apache.flink.table.descriptors._objectKafkaPipeLineTest {
/*flink从kafka读数据 并写入另一个kafka*/defmain(args: Array[String]): Unit= {
valenv=StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, "D:\\JavaProject\\FlinkStudy01\\target\\FlinkStudy01-1.0-SNAPSHOT.jar")
env.setParallelism(2)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
valtableEnv=StreamTableEnvironment.create(env)
/*aa 10 1547718199aa 11 1547718200aa 12 1547718203*/tableEnv.connect(newKafka() // /opt/kafka_2.12-2.4.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sensor                     .version("0.11") // kafka版本                     .topic("sensor")
                     .property("zookeeper.connect", "localhost:2181")
                     .property("bootstrap.servers", "localhost:9092")
                    )
    .withFormat(newCsv().fieldDelimiter(','))
    .withSchema(newSchema()
                  .field("id", DataTypes.STRING())
                  .field("wendu", DataTypes.DOUBLE())
                  .field("shiJianChuo", DataTypes.BIGINT())
                 )
    .createTemporaryTable("InputTable")
// 查询转换valsensorTable=tableEnv.from("InputTable")
valresultTable=sensorTable    .select('id, 'wendu)
    .filter('id!=="111")
tableEnv.connect(newKafka() // /opt/kafka_2.12-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sinkTest                     .version("0.11") // kafka版本                     .topic("sinkTest")
                     .property("zookeeper.connect", "localhost:2181")
                     .property("bootstrap.servers", "localhost:9092")
                    )
    .withFormat(newCsv())
    .withSchema(newSchema()
                  .field("id", DataTypes.STRING())
                  .field("wendu", DataTypes.DOUBLE())
                 )
    .createTemporaryTable("kafkaOutputTable")
resultTable.insertInto("kafkaOutputTable")
tableEnv.execute("kafka pipeline test")
  }
}

在启动代码前或者后,需要打开两个终端,进入到docker的kafka中,在bin目录下分别输入kafka-console-producer.sh --broker-list localhost:9092 --topic sensorkafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sinkTest来启动生产者和消费者。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
2月前
|
SQL 机器学习/深度学习 分布式计算
大数据-81 Spark 安装配置环境 集群环境配置 超详细 三台云服务器
大数据-81 Spark 安装配置环境 集群环境配置 超详细 三台云服务器
90 1
|
2月前
|
存储 分布式计算 API
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
109 0
|
8天前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
37 2
|
6月前
|
分布式计算 大数据 数据处理
经典大数据处理框架与通用架构对比
【6月更文挑战第15天】本文介绍Apache Beam是谷歌开源的统一数据处理框架,提供可移植API,支持批处理和流处理。与其他架构相比,Lambda和Kappa分别专注于实时和流处理,而Beam在两者之间提供平衡,具备高实时性和数据一致性,但复杂性较高。选择架构应基于业务需求和场景。
490 3
经典大数据处理框架与通用架构对比
|
6月前
|
分布式计算 大数据 数据处理
浅谈几个经典大数据处理框架
【6月更文挑战第15天】本文介绍企业如何在数据洪流中保持竞争力需借助可扩展平台和数据策略。数据管道整合多元数据源,便于分析和流转。Kappa架构专注于实时处理(如通过Kafka、Spark Streaming),适合实时响应场景;Lambda架构结合批处理与实时处理(如Spark、Hadoop与Flink),平衡实时性和批处理,易于开发和维护。Apache Beam提供统一模型,适用于流处理和批处理,提升代码复用和效率。这两种架构满足现代应用对数据一致、性能和灵活性的需求。
512 3
浅谈几个经典大数据处理框架
|
1月前
|
存储 大数据 数据处理
大数据环境下的性能优化策略
大数据环境下的性能优化策略
53 2
|
2月前
|
分布式计算 Hadoop 大数据
大数据体系知识学习(一):PySpark和Hadoop环境的搭建与测试
这篇文章是关于大数据体系知识学习的,主要介绍了Apache Spark的基本概念、特点、组件,以及如何安装配置Java、PySpark和Hadoop环境。文章还提供了详细的安装步骤和测试代码,帮助读者搭建和测试大数据环境。
83 1
|
3月前
|
数据采集 分布式计算 MaxCompute
MaxCompute 分布式计算框架 MaxFrame 服务正式商业化公告
MaxCompute 分布式计算框架 MaxFrame 服务于北京时间2024年09月27日正式商业化!
109 3
|
4月前
|
存储 数据可视化 数据挖掘
大数据环境下的房地产数据分析与预测研究的设计与实现
本文介绍了一个基于Python大数据环境下的昆明房地产市场分析与预测系统,通过数据采集、清洗、分析、机器学习建模和数据可视化技术,为房地产行业提供决策支持和市场洞察,探讨了模型的可行性、功能需求、数据库设计及实现过程,并展望了未来研究方向。
234 4
大数据环境下的房地产数据分析与预测研究的设计与实现
|
5月前
|
分布式计算 安全 大数据
HAS插件式Kerberos认证框架:构建安全可靠的大数据生态系统
在教育和科研领域,研究人员需要共享大量数据以促进合作。HAS框架可以提供一个安全的数据共享平台,确保数据的安全性和合规性。