[Spark][Streaming]Spark读取网络输入的例子

简介:

Spark读取网络输入的例子:

参考如下的URL进行试验

https://stackoverflow.com/questions/46739081/how-to-get-record-in-string-format-from-sockettextstream
http://www.cnblogs.com/FG123/p/5324743.html

发现 先执行 nc -lk 9999 ,再执行 spark 程序之后, 
如果停止 nc ,spark程序会报错:

类似于:

复制代码

-------------------------------------------
Time: 2017-10-28 19:32:02
-------------------------------------------

17/10/28 19:32:23 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:9999 - java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at java.net.Socket.<init>(Socket.java:434)
at java.net.Socket.<init>(Socket.java:211)
at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59)

复制代码

 

这表明,两者已经建立 的 通信。但是没有看到预想的 word count 输出。我猜测是 用于参与计算的进程数不够,所以进行如下改动:

sc = SparkContext("local[2]", "streamwordcount")

改为:

sc = SparkContext("local[3]", "streamwordcount")

整个程序如下:

复制代码

[training@localhost ab]$ cat test.py
#showing remote messages

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":

sc = SparkContext("local[3]", "streamwordcount")
# 创建本地的SparkContext对象,包含3个执行线程

ssc = StreamingContext(sc, 2)
# 创建本地的StreamingContext对象,处理的时间片间隔时间,设置为2s

lines = ssc.socketTextStream("localhost", 9999)

words = lines.flatMap(lambda line: line.split(" "))
# 使用flatMap和Split对2秒内收到的字符串进行分割

pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

wordCounts.pprint()

ssc.start() 
# 启动Spark Streaming应用

ssc.awaitTermination()

复制代码

再次运行 nc 程序

[training@localhost ~]$ nc -lk 9999

运行 spark 程序:

[training@localhost ~]$ spark-submit /home/training/ab/test.py

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

 

在nc窗口中输入一些数据:

复制代码

aaa bbb ccc
ddd aaa sss
sss bbb bbb

kkk jjj mmm
ooo kkk jjj
mmm ccc ddd
eee fff sss
rrr nnn ooo
ppp sss zzz
mmm sss ttt
kkk sss ttt
rrr ooo ppp
kkk qqq kkk
lll nnn jjj
rrr ooo sss
kkk aaa ddd
aaa aaa fff
eee sss nnn
ooo ppp qqq
qqq sss eee
sss mmm nnn

复制代码

 

此时,经过一小会,可以看到,spark 程序的窗口输出:

复制代码

------------------------------------------- 
Time: 2017-10-28 19:33:50
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:33:52
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:33:54
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:33:56
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:33:58
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:34:00
-------------------------------------------
(u'', 1)
(u'mmm', 2)
(u'bbb', 3)
(u'nnn', 1)
(u'ccc', 2)
(u'rrr', 1)
(u'sss', 3)
(u'fff', 1)
(u'aaa', 2)
(u'ooo', 2)
...

------------------------------------------- 
Time: 2017-10-28 19:34:02
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:34:04
-------------------------------------------
(u'ppp', 1)
(u'sss', 1)
(u'zzz', 1)

------------------------------------------- 
Time: 2017-10-28 19:34:06
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:34:08
-------------------------------------------
(u'mmm', 1)
(u'sss', 1)
(u'ttt', 1)

------------------------------------------- 
Time: 2017-10-28 19:34:10
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:34:12
-------------------------------------------
(u'sss', 1)
(u'ttt', 1)
(u'kkk', 1)

------------------------------------------- 
Time: 2017-10-28 19:34:14
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:34:16
-------------------------------------------
(u'ppp', 1)
(u'rrr', 1)
(u'ooo', 1)

------------------------------------------- 
Time: 2017-10-28 19:34:18
-------------------------------------------
(u'qqq', 1)
(u'kkk', 2)

------------------------------------------- 
Time: 2017-10-28 19:34:20
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:34:22
-------------------------------------------

复制代码
目录
相关文章
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
61 0
|
3月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
118 0
|
2月前
|
分布式计算 流计算 Spark
【赵渝强老师】Spark Streaming中的DStream
本文介绍了Spark Streaming的核心概念DStream,即离散流。DStream通过时间间隔将连续的数据流转换为一系列不连续的RDD,再通过Transformation进行转换,实现流式数据的处理。文中以MyNetworkWordCount程序为例,展示了DStream生成RDD的过程,并附有视频讲解。
|
3月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
76 0
|
3月前
|
SQL 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
65 0
|
3月前
|
存储 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
66 0
|
3月前
|
SQL 分布式计算 大数据
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
50 0
|
3月前
|
SQL 分布式计算 大数据
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(二)
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(二)
42 0
|
3月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
45 0
|
3月前
|
消息中间件 分布式计算 Kafka
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
58 0