Window环境下安装大数据框架

本文涉及的产品
RDS AI 助手,专业版
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
简介: 虽然公司配了mac,不过家里的电脑还是上学时候的拯救者,学习大数据框架的时候也基本是直接上虚拟机,每次打开电脑,打开虚拟机,再打开xshell,真的是烦,正好最近重装了一下电脑,于是就想着结合scoop+docker优化一下之前的虚拟机+xshell的路线。

SCOOP的安装

作为window版本的包管理工具,在github上scoop有着完整的安装方式,首先打开自己的powershell,然后输入iwr -useb get.scoop.sh | iex,按照教程的说法,此刻scoop就会安装在你c盘的用户文件夹下的当前用户名的目录下。

可惜的是我没有,他报错了:raw.githubusercontent.com   #comments. put the address here,网上说是dns污染导致,只需要在https://www.ipaddress.com/键入raw.githubusercontent.com就可以获得ip地址(绕开寻址,直接跳转),然后打开c盘的hosts文件,输入ip地址 raw.githubusercontent.com即可。

修改hosts文件

hosts文件在C:\Windows\System32\drivers\etc,修改hosts文件的时候会提示无法保存,用管理员打开也依旧不可以。

需要找到hosts文件,然后打开属性再打开安全,选择编辑,然后点击添加,点高级,点立即查找,再选中我们的用户名,点确定后,权限选择完全控制,确定之后再次打开就可以正常修改保存。

禁止执行脚本

然后我在管理员打开的powershell中继续执行scoop的安装,结果报错PowerShell requires an execution policy in [Unrestricted, RemoteSigned, ByPa,搜索说是执行策略的问题,可以通过set-executionpolicy RemoteSigned来调整执行策略,然后用get-executionpolicy来查看自己当前的执行策略。

结果依旧不可以,结果是不建议管理员打开的powershell安装软件,重新打开一个新的powershell即可安装。

SCOOP安装软件

下面这幅图是我目前安装的软件包

其他没有出问题的,这里就不说了,简单说一下安装的方法和我安装中出现问题的几款软件。

安装很简单,想搜索可以用scoop search 包名搜索,用scoop install 包名安装,用scoop uninstall 包名卸载,scoop list查看当前安装的包,类似yum或者brew的操作。

Java乱码

当我安装完openjdk11后,输入java,发现出现乱码,试图通过调整时间,或者是转换文字格式都无法解决,虽然说是很像中文乱码的问题,不过我尝试过其他中文显示,都是正常的,记得用龙蜥的时候用man也发现类似的问题,我寻思能用就行。

MySQL的启动

安装完mysql,我看给的密码为blank,然后打开新的powershell,输入mysql -u root -p报错。

原因很简单,需要先启动mysql服务,mysqld --standalone,可以通过net stop mysql来停止mysql服务,这样mysql就可以正常使用了。

Python的安装

scoop安装python的时候,报错了,报错信息是ERROR 'dark' isn't installed correctly.不过影响不大,工具这东西是越简单越好,只需要在没有安装python的电脑终端输入python,就可以自动跳转到微软商店的开发者工具,一键安装即可。

Docker的安装

scoop安装好后,现在来安装docker,作为一款容器,docker可以帮助我们快速搭建一个hadoop集群,也可以搭建以flink集群,安装和使用都比打开虚拟机再上传代码等快速而方便。

安装也很简单,只需要在https://docs.docker.com/desktop/install/windows-install/点击Docker Desktop for Windows即可,不过和mac以及linux不同,windows要求Docker桌面端需要四个条件,首先是win10以上版本,然后需要64位处理器,4G的内存,还需要打开虚拟化。

打开Hyper-V

所谓虚拟化,也就是在windows功能(可以搜索启用或关闭windows功能)中选中Hyper-V,只不过windows家庭版是没有该选项的。

需要我们手动建一个文件,然后输入内容如下:

pushd"%~dp0"dir/b%SystemRoot%\servicing\Packages\*Hyper-V*.mum>hyper-v.txtfor/f%%iin('findstr /i . hyper-v.txt 2^>nul')dodism/online/norestart/add-package:"%SystemRoot%\servicing\Packages\%%i"delhyper-v.txtDism/online/enable-feature/featurename:Microsoft-Hyper-V-All/LimitAccess/ALL

需要注意粘贴格式,我直接用vim建个文件粘贴的,然后将文件名改为cmd,用管理员运行文件,会提示是否安装,输入Y安装重启电脑,Hyper-V就会出现并勾选,此时docker就可以正常运行了。

Flink+Kafka示例

docker的具体命令写法,这里就不写了,基本了解docker run,docker exec,dokcer images,docker ps,docker-compose应该也差不多够用。

Docker部分

这里我们首先在一个空文件夹下创建docker-compose.yml

version: "3.7"# zookeeper: 172.26.0.10# kafka: 172.26.0.11# flink: - job manager 172.26.0.20#        - task manager(s) random assigment in 172.26.0.0/24services:  ZooKeeper:    image: ubuntu/zookeeper
    container_name: ZooKeeper
    hostname: ZooKeeper
    volumes:      - ./data:/data
    ports:      - "2181:2181"    networks:      kafka-flink-cluster-network:        ipv4_address: 172.26.0.10  Kafka:    image: wurstmeister/kafka
    ports:      - "9092:9092"    depends_on:      - ZooKeeper
    environment:      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1      KAFKA_MESSAGE_MAX_BYTES: 52428800  # 50M      KAFKA_CREATE_TOPICS: "topic_one:1:1,topic_two:3:1"      KAFKA_ZOOKEEPER_CONNECT: ZooKeeper:2181    volumes:      - ./kafka-logs:/kafka
    networks:      kafka-flink-cluster-network:        ipv4_address: 172.26.0.11  FlinkJobManager:    image: flink:1.14.4-scala_2.12-java8
    container_name: FlinkJobManager
    hostname: FlinkJobManager
    volumes:      - ./flink-jobmanager:/data
    expose:      - "6123"    links:      -  Kafka
    networks:      kafka-flink-cluster-network:        ipv4_address: 172.26.0.20    ports:      - "8081:8081"    command: jobmanager
    environment:      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: FlinkJobManager
  FlinkTaskManager:    image: flink:1.14.4-scala_2.12-java8
#container_name: FlinkTaskManager#hostname: FlinkTaskManager    expose:      - "6121"      - "6122"    depends_on:      - FlinkJobManager
    command: taskmanager
    scale: 2    links:      - "FlinkJobManager:jobmanager"    environment:      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: FlinkJobManager
        taskmanager.numberOfTaskSlots: 2    networks:      - kafka-flink-cluster-network
networks:  kafka-flink-cluster-network:# external: true    driver: bridge
    ipam:      config:        - subnet: 172.26.0.0/24

然后在当前路径的powershell中输入docker-compose up -d,等到安装结束,在docker的可视化界面就可以看到五个启动的容器,或者输入docker ps也可以看到对应的容器,需要留意的是docker容器启动后再添加端口会很麻烦,建议提前写好,也就是文件expose下添加映射。

启动完容器,我们可以输入docker exec -it   docker-flink-zookerper-kafaka-Kafka-1 bash进入kafka中,然后cd opt/kafka/bin中找到消费者和生产者对应的shell。

Scala部分

在idea中新建一个项目,并且在pom中配置上scala和flink依赖,这里需要留意版本问题,此处flink我选用的1.11,而scala选用的是2.12。

<?xmlversion="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkStudy01</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.11.0</flink.version><scala.version>2.12</scala.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId><version>${flink.version}</version><!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.version}</artifactId><version>${flink.version}</version><!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.version}</artifactId><version>${flink.version}</version><!--            <scope>provided</scope>--></dependency><!--  <dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.version}</artifactId><version>${flink.version}</version></dependency></dependencies></project>

然后写一个测试脚本,从kafka的生产者中读取数据,走过flink,再输出到kafka的消费者中。

packagecom.daishu//import org.apache.flink.runtime.io.network.buffer.Buffer.DataTypeimportorg.apache.flink.streaming.api._importorg.apache.flink.streaming.api.scala._importorg.apache.flink.table.api._importorg.apache.flink.table.api.bridge.scala._importorg.apache.flink.table.descriptors._objectKafkaPipeLineTest {
/*flink从kafka读数据 并写入另一个kafka*/defmain(args: Array[String]): Unit= {
valenv=StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, "D:\\JavaProject\\FlinkStudy01\\target\\FlinkStudy01-1.0-SNAPSHOT.jar")
env.setParallelism(2)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
valtableEnv=StreamTableEnvironment.create(env)
/*aa 10 1547718199aa 11 1547718200aa 12 1547718203*/tableEnv.connect(newKafka() // /opt/kafka_2.12-2.4.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sensor                     .version("0.11") // kafka版本                     .topic("sensor")
                     .property("zookeeper.connect", "localhost:2181")
                     .property("bootstrap.servers", "localhost:9092")
                    )
    .withFormat(newCsv().fieldDelimiter(','))
    .withSchema(newSchema()
                  .field("id", DataTypes.STRING())
                  .field("wendu", DataTypes.DOUBLE())
                  .field("shiJianChuo", DataTypes.BIGINT())
                 )
    .createTemporaryTable("InputTable")
// 查询转换valsensorTable=tableEnv.from("InputTable")
valresultTable=sensorTable    .select('id, 'wendu)
    .filter('id!=="111")
tableEnv.connect(newKafka() // /opt/kafka_2.12-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sinkTest                     .version("0.11") // kafka版本                     .topic("sinkTest")
                     .property("zookeeper.connect", "localhost:2181")
                     .property("bootstrap.servers", "localhost:9092")
                    )
    .withFormat(newCsv())
    .withSchema(newSchema()
                  .field("id", DataTypes.STRING())
                  .field("wendu", DataTypes.DOUBLE())
                 )
    .createTemporaryTable("kafkaOutputTable")
resultTable.insertInto("kafkaOutputTable")
tableEnv.execute("kafka pipeline test")
  }
}

在启动代码前或者后,需要打开两个终端,进入到docker的kafka中,在bin目录下分别输入kafka-console-producer.sh --broker-list localhost:9092 --topic sensorkafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sinkTest来启动生产者和消费者。

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
6月前
|
数据采集 搜索推荐 Java
Java 大视界 -- Java 大数据在智能教育虚拟学习环境构建与用户体验优化中的应用(221)
本文探讨 Java 大数据在智能教育虚拟学习环境中的应用,涵盖多源数据采集、个性化推荐、实时互动优化等核心技术,结合实际案例分析其在提升学习体验与教学质量中的成效,并展望未来发展方向与技术挑战。
|
6月前
|
传感器 分布式计算 大数据
“用大数据盯着天看地”——聊聊环境监测的精准化升级
“用大数据盯着天看地”——聊聊环境监测的精准化升级
154 0
|
7月前
|
传感器 机器学习/深度学习 算法
Java 大视界 -- Java 大数据在智能农业温室环境调控与作物生长模型构建中的应用(189)
本文探讨了Java大数据在智能农业温室环境调控与作物生长模型构建中的关键应用。通过高效采集、传输与处理温室环境数据,结合机器学习算法,实现温度、湿度、光照等参数的智能调控,提升作物产量与品质。同时,融合多源数据构建精准作物生长模型,助力农业智能化、精细化发展,推动农业现代化进程。
|
SQL 机器学习/深度学习 分布式计算
大数据-81 Spark 安装配置环境 集群环境配置 超详细 三台云服务器
大数据-81 Spark 安装配置环境 集群环境配置 超详细 三台云服务器
821 1
|
分布式计算 大数据 数据处理
经典大数据处理框架与通用架构对比
【6月更文挑战第15天】本文介绍Apache Beam是谷歌开源的统一数据处理框架,提供可移植API,支持批处理和流处理。与其他架构相比,Lambda和Kappa分别专注于实时和流处理,而Beam在两者之间提供平衡,具备高实时性和数据一致性,但复杂性较高。选择架构应基于业务需求和场景。
985 3
经典大数据处理框架与通用架构对比
|
分布式计算 大数据 数据处理
浅谈几个经典大数据处理框架
【6月更文挑战第15天】本文介绍企业如何在数据洪流中保持竞争力需借助可扩展平台和数据策略。数据管道整合多元数据源,便于分析和流转。Kappa架构专注于实时处理(如通过Kafka、Spark Streaming),适合实时响应场景;Lambda架构结合批处理与实时处理(如Spark、Hadoop与Flink),平衡实时性和批处理,易于开发和维护。Apache Beam提供统一模型,适用于流处理和批处理,提升代码复用和效率。这两种架构满足现代应用对数据一致、性能和灵活性的需求。
1334 3
浅谈几个经典大数据处理框架
|
存储 分布式计算 API
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
632 0
|
数据采集 人工智能 分布式计算
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
阿里云推出的MaxFrame是链接大数据与AI的分布式Python计算框架,提供类似Pandas的操作接口和分布式处理能力。本文从部署、功能验证到实际场景全面评测MaxFrame,涵盖分布式Pandas操作、大语言模型数据预处理及企业级应用。结果显示,MaxFrame在处理大规模数据时性能显著提升,代码兼容性强,适合从数据清洗到训练数据生成的全链路场景...
689 5
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
|
人工智能 分布式计算 大数据
MaxFrame 产品评测:大数据与AI融合的Python分布式计算框架
MaxFrame是阿里云MaxCompute推出的自研Python分布式计算框架,支持大规模数据处理与AI应用。它提供类似Pandas的API,简化开发流程,并兼容多种机器学习库,加速模型训练前的数据准备。MaxFrame融合大数据和AI,提升效率、促进协作、增强创新能力。尽管初次配置稍显复杂,但其强大的功能集、性能优化及开放性使其成为现代企业与研究机构的理想选择。未来有望进一步简化使用门槛并加强社区建设。
575 8
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
507 2