我建立了一个带有生产者和消费者的kafka系统,作为消息流式传输json文件的行。
使用pyspark,我需要分析不同流媒体窗口的数据。为此,我需要查看pyspark流式传输的数据......我该怎么做?
要运行代码,我使用了Yannael的Docker容器。这是我的python代码:
Add dependencies and load modules
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0,com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3 pyspark-shell'
from kafka import KafkaConsumer
from random import randint
from time import sleep
Load modules and start SparkContext
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
conf = SparkConf() \
.setAppName("Streaming test") \
.setMaster("local[2]") \
.set("spark.cassandra.connection.host", "127.0.0.1")
try:
sc.stop()
except:
pass
sc = SparkContext(conf=conf)
sqlContext=SQLContext(sc)
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
Create streaming task
ssc = StreamingContext(sc, 0.60)
kafkaStream = KafkaUtils.createStream(ssc, "127.0.0.1:2181", "spark-streaming-consumer", {'test': 1})
ssc.start()
可以调试kafkaStream.pprint()或了解有关结构化流媒体的更多信息,您也可以这样写
query = kafkaStream \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
我看到你有cassandra端点,所以假设你写入Cassandra,你可以使用Kafka Connect而不是为此编写Spark代码
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。