CreateDirectStream 消费数据|学习笔记

简介: 快速学习 CreateDirectStream 消费数据

开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建)CreateDirectStream 消费数据】学习笔记与课程紧密联系,让用户快速学习知识

课程地址https://developer.aliyun.com/learning/course/670/detail/11627


CreateDirectStream 消费数据

 

内容介绍:

一、CreateDirectStream 消费数据的步骤

二、CreateDirectStream 的代码实现

 

一、CreateDirectStream 消费数据的步骤

目标:掌握CreateDirectStream 消费数据的步骤

1、创建Spark conf

2、创建SparkContext

3、创建Streaming Context

4、读取kafka 内的数据ssc,kafkaParams,topics)

5、消费数据

6、开启 Streaming 任务+开启循环

 

二、CreateDirectStream 的代码实现

来到开发环境中,打开 ispider 并将其中的 main 关掉,找到test ,右键点击 scala 后,将复制出的CreateDirectStream 新建到Scala 的Object ,输入TestCreateDirectStream。

接下来处理消费数据的整个流程,用CreateDirectStream 的方法来读取并消费。

如下:

1、程序的入口

首先看代码逻辑,要执行该操作,就要创建出一个main 方法。

object TestCreateDirectStream {

//程序的入口

def main(args: Array[string]): Unit = {

//1创建 spark conf

valconf=newSparkConf().setMaster("local[2]").setAppName ("TestCreateDirectstream")

//2、创建 SparkContext

val sc=new SparkContext(conf)

//3、创建 streaming Context

val ssc=new StreamingContext(sc,Seconds(2))

//4、读取 kafka 内的数据 ssc,kafkaParams,topics)

KafkaUtils.createDirectstream()

//其中 createDirectstream() 会爆红,因为createDirectstream() 中需要很多参数,但实际里面没有参数。

查看缺的参数需要按ctrl ,会出现很多用法。需要用到(其中Class 参数是无用的):

def createDirectStream [K, V, KD <: Decoder[K], VD <: Decoder[V]](

jssc: JavaStreamingContext,

keyClass: Class[K],

valueClass: Class[V],

keyDecoderClass: Class[KD],

valueDecoderClass: Class[VD],

kafkaParams: JMap[String, String],

topics: JSet[String]

//实例kafkaParams

val kafkaParams = Map ("bootstrap.servers "->" 192.168.100.100:9092,192.168.100.110:9092,192.168.100.120:9092")

//实例 topics

val topics=Set("test01")

//接收数据

val kafkaDatas = KafkaUtils.createDirectStream [String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)

//只获取value 数据

val kafkaValue=kafkaDatas.map(_._2)

//5、消费数据

kafkaValue.foreachRDD(rdd=>rdd.foreach(println))

//6、开启 streaming 任务+开启循环

ssc.start()

ssc.awaitTermination()

相关文章
|
3月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
51 1
|
5月前
|
消息中间件 负载均衡 Kafka
【Kafka消费秘籍】深入了解消费者组与独立模式,掌握消息消费的两种超能力!
【8月更文挑战第24天】Apache Kafka是一款高性能的分布式消息系统,支持灵活多样的消费模型以适应不同的应用场景。消息按主题组织,每个主题可划分为多个分区,确保消息顺序性。本文深入探讨了Kafka中的两大核心消费模式:消费者组(Consumer Group)和独立消费者(Standalone Consumer)。消费者组允许多个消费者协同工作,实现负载均衡及故障恢复,是最常用的消费模式。独立消费者模式则适用于需要高度定制化处理逻辑的场景,如消息重放等。通过对比这两种模式的特点和提供的示例代码,开发者可以根据具体需求选择最合适的消费策略,从而更好地利用Kafka构建高效的数据流应用程序。
153 3
|
7月前
|
消息中间件 网络协议 物联网
消息队列 MQ产品使用合集之如何让消费者不从最开始进行消费,而是从最后一条消息开始消费
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
消息中间件 Linux 开发工具
消息队列 MQ产品使用合集之重复消费一般是什么导致的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
负载均衡 网络性能优化
EMQ如何保证消息不重复消费?
EMQ(Erlang MQTT Broker)通过以下机制来保证消息不重复消费
795 2
|
8月前
|
消息中间件 监控 中间件
【工作中问题解决实践 十一】Kafka消费者消费堆积且频繁rebalance
【工作中问题解决实践 十一】Kafka消费者消费堆积且频繁rebalance
522 0
|
消息中间件 存储 安全
RocketMQ-消息消费模式 顺序消费
RocketMQ-消息消费模式 顺序消费
228 0
|
消息中间件 负载均衡 Java
RocketMQ结合实际场景顺序消费,它是如何保证顺序消费的?
RocketMQ结合实际场景顺序消费,它是如何保证顺序消费的?
RocketMQ结合实际场景顺序消费,它是如何保证顺序消费的?
|
消息中间件 NoSQL 关系型数据库
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
实战:如何防止mq消费方消息重复消费 如果因为网络延迟等原因,mq无法及时接收到消费方的应答,导致mq重试。(计算机网络)。在重试过程中造成重复消费的问题
2775 1
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
|
消息中间件 存储 RocketMQ
阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?
阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?
756 0
阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?

热门文章

最新文章