开发者社区> 问答> 正文

Pyspark - 打印来自Kafka的消息

我建立了一个带有生产者和消费者的kafka系统,作为消息流式传输json文件的行。

使用pyspark,我需要分析不同流媒体窗口的数据。为此,我需要查看pyspark流式传输的数据......我该怎么做?

要运行代码,我使用了Yannael的Docker容器。这是我的python代码:

Add dependencies and load modules
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0,com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3 pyspark-shell'

from kafka import KafkaConsumer
from random import randint
from time import sleep

Load modules and start SparkContext
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
conf = SparkConf() \

.setAppName("Streaming test") \
.setMaster("local[2]") \
.set("spark.cassandra.connection.host", "127.0.0.1")

try:

sc.stop()

except:

pass    

sc = SparkContext(conf=conf)
sqlContext=SQLContext(sc)
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

Create streaming task
ssc = StreamingContext(sc, 0.60)
kafkaStream = KafkaUtils.createStream(ssc, "127.0.0.1:2181", "spark-streaming-consumer", {'test': 1})
ssc.start()

展开
收起
社区小助手 2018-12-12 14:04:32 2742 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    可以调试kafkaStream.pprint()或了解有关结构化流媒体的更多信息,您也可以这样写

    query = kafkaStream \

    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()
    

    query.awaitTermination()
    我看到你有cassandra端点,所以假设你写入Cassandra,你可以使用Kafka Connect而不是为此编写Spark代码

    2019-07-17 23:20:10
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载