阿里云ECS体验 —— 构建kafka应用程序

简介: 由于课程需要,使用阿里云构建了单节点的kafka服务,在本地IDEA平台进行应用程序设计与编写,实现了本文文件的上传、处理以及本地持久化操作。

# 大数据系统基础 — Kafka



# 问题一:若kafka设置的副本级别为2,partition数为3,请阐述“Hello kafka”, “Hello world”, “kafka”三条信息的传输流程。


根据题意,当前副本级别(replication)为2,partition数为3。其中parition 0 的leader为broker 1;parition 1的leader为broker 2; partition 3的leader为broker 3,于是


- 由于信息“hello kafka"未设置flag,因此采用round-robin方式进行分配,不妨设分配到partition 0。由于parition 0的leader是broker 1,信息传输至broker 1,由leader partition 0再分配至broker 2与broker 3;

- 信息未设置flag,根据round-robin,分配至partition 1,即leader partition 所在的broker 2,接着由broker 2将信息发送至broker 1与broker 3;

- 信息未设置flag,分配至partition 2,即broker 3,接着由leader 负责将信息同步到broker 1与broker 3.


# 问题二:WordCount


## 1. 启动kafka


1. 首先启动zookeeper,这是kafka目前的依赖环境


```jsx

$ bin/zkServer.sh start

```


1. 接着,启动kafka服务


```jsx

$ bin/kafka-server-start.sh config/server.properties

```


1. 此时,新开一个terminal,创建一个topic


```jsx

$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092

Topic:quickstart-events  PartitionCount:1    ReplicationFactor:1 Configs:

   Topic: quickstart-events Partition: 0    Leader: 0   Replicas: 0 Isr: 0

```


1. 接着,运行producer,负责写入信息


```jsx

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

This is my first event

This is my second event

```


1. 接着,新开一个terminal,启动consumer,负责读取信息


```jsx

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

This is my first event

This is my second event

```


实例运行:


![Untitled](%E5%A4%A7%E6%95%B0%E6%8D%AE%E7%B3%BB%E7%BB%9F%E5%9F%BA%E7%A1%80%20%E2%80%94%20Kafka%20062843a22d2843dbb2cf316eb2191173/Untitled.png)


## 2. 文本单词统计


       为了统计给定文本的单词数量,并持久化存储,需要一个producer从文件中读取数据,存放至"streams-plaintext-input";一个从"streams-plaintext-input"到"streams-wordcount-output"的stream负责进行统计;最后需要一个consumer将"streams-wordcount-output"的结果读取并保存。


结果如下图所示:


![Untitled](%E5%A4%A7%E6%95%B0%E6%8D%AE%E7%B3%BB%E7%BB%9F%E5%9F%BA%E7%A1%80%20%E2%80%94%20Kafka%20062843a22d2843dbb2cf316eb2191173/Untitled%201.png)


Figure 1. Producer从文本中读取,并上传。


![Untitled](%E5%A4%A7%E6%95%B0%E6%8D%AE%E7%B3%BB%E7%BB%9F%E5%9F%BA%E7%A1%80%20%E2%80%94%20Kafka%20062843a22d2843dbb2cf316eb2191173/Untitled%202.png)


Figure 2. Stream对文本进行统计,结果传送至另一topic。


![Untitled](%E5%A4%A7%E6%95%B0%E6%8D%AE%E7%B3%BB%E7%BB%9F%E5%9F%BA%E7%A1%80%20%E2%80%94%20Kafka%20062843a22d2843dbb2cf316eb2191173/Untitled%203.png)


Figure 3. Consumer 读取结果,并保存。


![Untitled](%E5%A4%A7%E6%95%B0%E6%8D%AE%E7%B3%BB%E7%BB%9F%E5%9F%BA%E7%A1%80%20%E2%80%94%20Kafka%20062843a22d2843dbb2cf316eb2191173/Untitled%204.png)


Figure 4. 输出结果


# 附录:源代码


doc.txt


```jsx

The Stanford neurologist, a soft-spoken demi-prodigy who became a professor while still a resident, had been obsessed for a decade with how to better define psychiatric disorders. Drugs for depression or bipolar disorder didn’t work for many patients with the conditions, and he suspected the reason was how traditional diagnoses didn’t actually get at the heart of what was going on in a patient’s brain.

He had shown in2006 that some patients with different diagnoses — PTSD vs. depression, for example — looked remarkably similar under brain imaging, suggesting clinicians drew distinctions in the wrong places. And in2014, he showed that one could define patients by looking at individual discrete behaviors, such as attention or sleep.

```


Load.java


```java

package myapps;


import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;


import java.io.BufferedReader;

import java.io.FileReader;

import java.util.Properties;


publicclass Load {


   publicstaticvoid main(String[] args) throwsException {

       Properties props = new Properties();

       props.put("bootstrap.servers", "8.130.172.60:9092");

       props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


       KafkaProducer<String,String> producer = newKafkaProducer<>(props);


       BufferedReader plaintxt =

               new BufferedReader(

                       new FileReader("C:\\Users\\yiqiao\\IdeaProjects\\kafkaDemo\\src\\main\\resources\\doc.txt"));

       String str;

       while ((str = plaintxt.readLine()) != null) {

           ProducerRecord<String,String> record =

                   newProducerRecord<>("streams-plaintext-input","1", str);

           producer.send(record);

           System.out.println(str);

       }

       producer.close();

       System.exit(0);

   }

}

```


WordCount.java


```java

package myapps;


import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.common.utils.Bytes;

import org.apache.kafka.streams.KafkaStreams;

import org.apache.kafka.streams.StreamsBuilder;

import org.apache.kafka.streams.StreamsConfig;

import org.apache.kafka.streams.Topology;

import org.apache.kafka.streams.kstream.KeyValueMapper;

import org.apache.kafka.streams.kstream.Materialized;

import org.apache.kafka.streams.kstream.Produced;

import org.apache.kafka.streams.kstream.ValueMapper;

import org.apache.kafka.streams.state.KeyValueStore;


import java.util.Arrays;

import java.util.Locale;

import java.util.Properties;

import java.util.concurrent.CountDownLatch;


publicclass WordCount {


   publicstaticvoid main(String[] args) throwsException {

       Properties props = new Properties();

       props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");

       props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "8.130.172.60:9092");

       props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

       props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());


       finalStreamsBuilder builder = new StreamsBuilder();


       builder.<String, String>stream("streams-plaintext-input")

              .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))

              .groupBy((key, value) -> value)

              .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))

              .toStream()

              .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));


       finalTopology topology = builder.build();

       finalKafkaStreams streams = new KafkaStreams(topology, props);

       finalCountDownLatch latch = new CountDownLatch(1);


       // attach shutdown handler to catch control-c

       Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {

           @Override

           publicvoid run() {

               streams.close();

               latch.countDown();

           }

       });


       try {

           streams.start();

           latch.await();

       } catch (Throwable e) {

           System.exit(1);

       }

       System.exit(0);

   }

}

```


Output.java


```java

package myapps;


import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.common.serialization.LongDeserializer;

import org.apache.kafka.common.serialization.StringDeserializer;


import java.io.*;

import java.time.Duration;

import java.util.Arrays;

import java.util.Properties;


publicclass Output {


   publicstaticvoid main(String[] args) throwsException {

       Properties props = new Properties();

       props.put(ConsumerConfig.GROUP_ID_CONFIG, "Word Counter");

       props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

       props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());

       props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "8.130.172.60:9092");


       KafkaConsumer<String, String> consumer = newKafkaConsumer<>(props);

       consumer.subscribe(Arrays.asList("streams-wordcount-output"));


       while (true) {

           ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

           BufferedWriter count_plain = new BufferedWriter(new OutputStreamWriter(

                   new FileOutputStream("C:\\Users\\yiqiao\\IdeaProjects\\kafkaDemo\\src\\main" +

                           "\\resources\\count_plain.txt", true)));

           for (ConsumerRecord<String, String> record : records) {

               System.out.println(record.key() + " " + String.valueOf(record.value()));

               count_plain.write(record.key() + " " + String.valueOf(record.value()) + "\n");

           }

           count_plain.close();

       }

   }

}

```


pom.xml


```html

<?xmlversion="1.0"encoding="UTF-8"?>

<!--

  Licensed to the Apache Software Foundation (ASF) under one or more

  contributor license agreements.  See the NOTICE file distributed with

  this work for additional information regarding copyright ownership.

  The ASF licenses this file to You under the Apache License, Version 2.0

  (the "License"); you may not use this file except in compliance with

  the License.  You may obtain a copy of the License at


      http://www.apache.org/licenses/LICENSE-2.0


  Unless required by applicable law or agreed to in writing, software

  distributed under the License is distributed on an "AS IS" BASIS,

  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

  See the License for the specific language governing permissions and

  limitations under the License.

--><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

   <modelVersion>4.0.0</modelVersion>


   <groupId>streams.examples</groupId>

   <artifactId>streams.examples</artifactId>

   <version>0.1</version>

   <packaging>jar</packaging>


   <name>Kafka Streams Quickstart :: Java</name>


   <properties>

       <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

       <kafka.version>3.0.0</kafka.version>

       <slf4j.version>1.7.7</slf4j.version>

       <log4j.version>1.2.17</log4j.version>

   </properties>


   <repositories>

       <repository>

           <id>apache.snapshots</id>

           <name>Apache Development Snapshot Repository</name>

           <url>https://repository.apache.org/content/repositories/snapshots/</url>

           <releases>

               <enabled>false</enabled>

           </releases>

           <snapshots>

           </snapshots>

       </repository>

   </repositories>


   <!--

       Execute "mvn clean package -Pbuild-jar"

       to build a jar file out of this project!

   -->


   <build>

       <plugins>

           <plugin>

               <groupId>org.apache.maven.plugins</groupId>

               <artifactId>maven-compiler-plugin</artifactId>

               <version>3.1</version>

               <configuration>

                   <source>1.8</source>

                   <target>1.8</target>

               </configuration>

           </plugin>

       </plugins>


       <pluginManagement>

           <plugins>

               <plugin>

                   <artifactId>maven-compiler-plugin</artifactId>

                   <configuration>

                       <source>1.8</source>

                       <target>1.8</target>

                       <compilerId>jdt</compilerId>

                   </configuration>

                   <dependencies>

                       <dependency>

                           <groupId>org.eclipse.tycho</groupId>

                           <artifactId>tycho-compiler-jdt</artifactId>

                           <version>0.21.0</version>

                       </dependency>

                   </dependencies>

               </plugin>

               <plugin>

                   <groupId>org.eclipse.m2e</groupId>

                   <artifactId>lifecycle-mapping</artifactId>

                   <version>1.0.0</version>

                   <configuration>

                       <lifecycleMappingMetadata>

                           <pluginExecutions>

                               <pluginExecution>

                                   <pluginExecutionFilter>

                                       <groupId>org.apache.maven.plugins</groupId>

                                       <artifactId>maven-assembly-plugin</artifactId>

                                       <versionRange>[2.4,)</versionRange>

                                       <goals>

                                           <goal>single</goal>

                                       </goals>

                                   </pluginExecutionFilter>

                                   <action>

                                       <ignore />

                                   </action>

                               </pluginExecution>

                               <pluginExecution>

                                   <pluginExecutionFilter>

                                       <groupId>org.apache.maven.plugins</groupId>

                                       <artifactId>maven-compiler-plugin</artifactId>

                                       <versionRange>[3.1,)</versionRange>

                                       <goals>

                                           <goal>testCompile</goal>

                                           <goal>compile</goal>

                                       </goals>

                                   </pluginExecutionFilter>

                                   <action>

                                       <ignore />

                                   </action>

                               </pluginExecution>

                           </pluginExecutions>

                       </lifecycleMappingMetadata>

                   </configuration>

               </plugin>

           </plugins>

       </pluginManagement>

   </build>


   <dependencies>

       <!-- Apache Kafka dependencies -->

       <dependency>

           <groupId>org.apache.kafka</groupId>

           <artifactId>kafka-streams</artifactId>

           <version>${kafka.version}</version>

       </dependency>

       <dependency>

           <groupId>org.slf4j</groupId>

           <artifactId>slf4j-api</artifactId>

           <version>${slf4j.version}</version>

       </dependency>

       <dependency>

           <groupId>org.slf4j</groupId>

           <artifactId>slf4j-simple</artifactId>

           <version>${slf4j.version}</version>

       </dependency>

   </dependencies>


</project>

```

目录
相关文章
|
1月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
326 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
1月前
|
弹性计算 运维 安全
阿里云轻量应用服务器与云服务器ECS啥区别?新手帮助教程
阿里云轻量应用服务器适合个人开发者搭建博客、测试环境等低流量场景,操作简单、成本低;ECS适用于企业级高负载业务,功能强大、灵活可扩展。二者在性能、网络、镜像及运维管理上差异显著,用户应根据实际需求选择。
214 10
|
1月前
|
运维 安全 Ubuntu
阿里云渠道商:服务器操作系统怎么选?
阿里云提供丰富操作系统镜像,涵盖Windows与主流Linux发行版。选型需综合技术兼容性、运维成本、安全稳定等因素。推荐Alibaba Cloud Linux、Ubuntu等用于Web与容器场景,Windows Server支撑.NET应用。建议优先选用LTS版本并进行测试验证,通过标准化镜像管理提升部署效率与一致性。
|
1月前
|
弹性计算 ice
阿里云4核8g服务器多少钱一年?1个月和1小时价格,省钱购买方法分享
阿里云4核8G服务器价格因实例类型而异,经济型e实例约159元/月,计算型c9i约371元/月,按小时计费最低0.45元。实际购买享折扣,1年最高可省至1578元,附主流ECS实例及CPU型号参考。
332 8
|
1月前
|
存储 监控 安全
阿里云渠道商:云服务器价格有什么变动?
阿里云带宽与存储费用呈基础资源降价、增值服务差异化趋势。企业应结合业务特点,通过阶梯计价、智能分层、弹性带宽等策略优化成本,借助云监控与预算预警机制,实现高效、可控的云资源管理。
|
1月前
|
弹性计算 运维 安全
区别及选择指南:阿里云轻量应用服务器与ECS云服务器有什么区别?
阿里云轻量应用服务器适合个人开发者、学生搭建博客、测试环境,易用且性价比高;ECS功能更强大,适合企业级应用如大数据、高流量网站。根据需求选择:轻量入门首选,ECS专业之选。
204 2
|
1月前
|
弹性计算 ice
阿里云4核8G云服务器配置价格:热门ECS实例及CPU处理器型号说明
阿里云2025年4核8G服务器配置价格汇总,涵盖经济型e实例、计算型c9i等热门ECS实例,CPU含Intel Xeon及AMD EPYC系列,月费159元起,年付低至1578元,按小时计费0.45元起,实际购买享折扣优惠。
382 1
|
1月前
|
存储 弹性计算 安全
阿里云渠道商:新手如何选择阿里云ECS实例?
阿里云ECS凭借弹性扩展、稳定可靠与安全防护,助力企业高效上云。本文系统解析实例规格选择关键因素:业务场景匹配、性能评估、成本优化、地域部署与扩展规划,结合计费模式与实际需求,提供科学选型建议,助您精准匹配资源,提升云上效能。(238字)
|
1月前
|
存储 弹性计算 安全
阿里云渠道商:新手怎么选阿里云ECS实例规格?
本文深入解析阿里云ECS的弹性、稳定与安全优势,详解实例规格选择的关键因素,涵盖应用场景、性能需求、成本预算、地理位置与扩展性,助力用户精准选型,优化业务部署与成本效益。

热门文章

最新文章

下一篇
oss云网关配置