SCOOP的安装
作为window版本的包管理工具,在github上scoop有着完整的安装方式,首先打开自己的powershell,然后输入iwr -useb get.scoop.sh | iex
,按照教程的说法,此刻scoop就会安装在你c盘的用户文件夹下的当前用户名的目录下。
可惜的是我没有,他报错了:raw.githubusercontent.com #comments. put the address here
,网上说是dns污染导致,只需要在https://www.ipaddress.com/键入raw.githubusercontent.com
就可以获得ip地址(绕开寻址,直接跳转),然后打开c盘的hosts文件,输入ip地址 raw.githubusercontent.com
即可。
修改hosts文件
hosts文件在C:\Windows\System32\drivers\etc
,修改hosts文件的时候会提示无法保存,用管理员打开也依旧不可以。
需要找到hosts文件,然后打开属性再打开安全,选择编辑,然后点击添加,点高级,点立即查找,再选中我们的用户名,点确定后,权限选择完全控制,确定之后再次打开就可以正常修改保存。
禁止执行脚本
然后我在管理员打开的powershell中继续执行scoop的安装,结果报错PowerShell requires an execution policy in [Unrestricted, RemoteSigned, ByPa
,搜索说是执行策略的问题,可以通过set-executionpolicy RemoteSigned
来调整执行策略,然后用get-executionpolicy
来查看自己当前的执行策略。
结果依旧不可以,结果是不建议管理员打开的powershell安装软件,重新打开一个新的powershell即可安装。
SCOOP安装软件
下面这幅图是我目前安装的软件包
其他没有出问题的,这里就不说了,简单说一下安装的方法和我安装中出现问题的几款软件。
安装很简单,想搜索可以用scoop search 包名搜索,用scoop install 包名安装,用scoop uninstall 包名卸载,scoop list查看当前安装的包,类似yum或者brew的操作。
Java乱码
当我安装完openjdk11后,输入java,发现出现乱码,试图通过调整时间,或者是转换文字格式都无法解决,虽然说是很像中文乱码的问题,不过我尝试过其他中文显示,都是正常的,记得用龙蜥的时候用man也发现类似的问题,我寻思能用就行。
MySQL的启动
安装完mysql,我看给的密码为blank,然后打开新的powershell,输入mysql -u root -p
报错。
原因很简单,需要先启动mysql服务,mysqld --standalone
,可以通过net stop mysql
来停止mysql服务,这样mysql就可以正常使用了。
Python的安装
scoop安装python的时候,报错了,报错信息是ERROR 'dark' isn't installed correctly.
不过影响不大,工具这东西是越简单越好,只需要在没有安装python的电脑终端输入python,就可以自动跳转到微软商店的开发者工具,一键安装即可。
Docker的安装
scoop安装好后,现在来安装docker,作为一款容器,docker可以帮助我们快速搭建一个hadoop集群,也可以搭建以flink集群,安装和使用都比打开虚拟机再上传代码等快速而方便。
安装也很简单,只需要在https://docs.docker.com/desktop/install/windows-install/点击Docker Desktop for Windows即可,不过和mac以及linux不同,windows要求Docker桌面端需要四个条件,首先是win10以上版本,然后需要64位处理器,4G的内存,还需要打开虚拟化。
打开Hyper-V
所谓虚拟化,也就是在windows功能(可以搜索启用或关闭windows功能)中选中Hyper-V,只不过windows家庭版是没有该选项的。
需要我们手动建一个文件,然后输入内容如下:
pushd"%~dp0"dir/b%SystemRoot%\servicing\Packages\*Hyper-V*.mum>hyper-v.txtfor/f%%iin('findstr /i . hyper-v.txt 2^>nul')dodism/online/norestart/add-package:"%SystemRoot%\servicing\Packages\%%i"delhyper-v.txtDism/online/enable-feature/featurename:Microsoft-Hyper-V-All/LimitAccess/ALL
需要注意粘贴格式,我直接用vim建个文件粘贴的,然后将文件名改为cmd,用管理员运行文件,会提示是否安装,输入Y安装重启电脑,Hyper-V就会出现并勾选,此时docker就可以正常运行了。
Flink+Kafka示例
docker的具体命令写法,这里就不写了,基本了解docker run,docker exec,dokcer images,docker ps,docker-compose应该也差不多够用。
Docker部分
这里我们首先在一个空文件夹下创建docker-compose.yml
version"3.7"# zookeeper: 172.26.0.10# kafka: 172.26.0.11# flink: - job manager 172.26.0.20# - task manager(s) random assigment in 172.26.0.0/24services ZooKeeper image ubuntu/zookeeper container_name ZooKeeper hostname ZooKeeper volumes ./data:/data ports"2181:2181" networks kafka-flink-cluster-network ipv4_address172.26.0.10 Kafka image wurstmeister/kafka ports"9092:9092" depends_on ZooKeeper environment KAFKA_ADVERTISED_HOST_NAME127.0.0.1 KAFKA_MESSAGE_MAX_BYTES 52428800 # 50M KAFKA_CREATE_TOPICS"topic_one:1:1,topic_two:3:1" KAFKA_ZOOKEEPER_CONNECT ZooKeeper2181 volumes ./kafka-logs:/kafka networks kafka-flink-cluster-network ipv4_address172.26.0.11 FlinkJobManager image flink 1.14.4-scala_2.12-java8 container_name FlinkJobManager hostname FlinkJobManager volumes ./flink-jobmanager:/data expose"6123" links Kafka networks kafka-flink-cluster-network ipv4_address172.26.0.20 ports"8081:8081" command jobmanager environment | FLINK_PROPERTIES= jobmanager.rpc.address FlinkJobManager FlinkTaskManager image flink 1.14.4-scala_2.12-java8 #container_name: FlinkTaskManager#hostname: FlinkTaskManager expose"6121""6122" depends_on FlinkJobManager command taskmanager scale2 links"FlinkJobManager:jobmanager" environment | FLINK_PROPERTIES= jobmanager.rpc.address FlinkJobManager taskmanager.numberOfTaskSlots2 networks kafka-flink-cluster-network networks kafka-flink-cluster-network# external: true driver bridge ipam configsubnet 172.26.0.0/24
然后在当前路径的powershell中输入docker-compose up -d
,等到安装结束,在docker的可视化界面就可以看到五个启动的容器,或者输入docker ps也可以看到对应的容器,需要留意的是docker容器启动后再添加端口会很麻烦,建议提前写好,也就是文件expose下添加映射。
启动完容器,我们可以输入docker exec -it docker-flink-zookerper-kafaka-Kafka-1 bash
进入kafka中,然后cd opt/kafka/bin
中找到消费者和生产者对应的shell。
Scala部分
在idea中新建一个项目,并且在pom中配置上scala和flink依赖,这里需要留意版本问题,此处flink我选用的1.11,而scala选用的是2.12。
<projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkStudy01</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.11.0</flink.version><scala.version>2.12</scala.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><!-- <dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.version}</artifactId><version>${flink.version}</version></dependency></dependencies></project>
然后写一个测试脚本,从kafka的生产者中读取数据,走过flink,再输出到kafka的消费者中。
packagecom.daishu//import org.apache.flink.runtime.io.network.buffer.Buffer.DataTypeimportorg.apache.flink.streaming.api._importorg.apache.flink.streaming.api.scala._importorg.apache.flink.table.api._importorg.apache.flink.table.api.bridge.scala._importorg.apache.flink.table.descriptors._objectKafkaPipeLineTest { /*flink从kafka读数据 并写入另一个kafka*/defmain(args: Array[String]): Unit= { valenv=StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, "D:\\JavaProject\\FlinkStudy01\\target\\FlinkStudy01-1.0-SNAPSHOT.jar") env.setParallelism(2) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) valtableEnv=StreamTableEnvironment.create(env) /*aa 10 1547718199aa 11 1547718200aa 12 1547718203*/tableEnv.connect(newKafka() // /opt/kafka_2.12-2.4.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sensor .version("0.11") // kafka版本 .topic("sensor") .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") ) .withFormat(newCsv().fieldDelimiter(',')) .withSchema(newSchema() .field("id", DataTypes.STRING()) .field("wendu", DataTypes.DOUBLE()) .field("shiJianChuo", DataTypes.BIGINT()) ) .createTemporaryTable("InputTable") // 查询转换valsensorTable=tableEnv.from("InputTable") valresultTable=sensorTable .select('id, 'wendu) .filter('id!=="111") tableEnv.connect(newKafka() // /opt/kafka_2.12-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sinkTest .version("0.11") // kafka版本 .topic("sinkTest") .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") ) .withFormat(newCsv()) .withSchema(newSchema() .field("id", DataTypes.STRING()) .field("wendu", DataTypes.DOUBLE()) ) .createTemporaryTable("kafkaOutputTable") resultTable.insertInto("kafkaOutputTable") tableEnv.execute("kafka pipeline test") } }
在启动代码前或者后,需要打开两个终端,进入到docker的kafka中,在bin目录下分别输入kafka-console-producer.sh --broker-list localhost:9092 --topic sensor
和kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sinkTest
来启动生产者和消费者。