Spark Streaming和Flink的Word Count对比-阿里云开发者社区

开发者社区> 大数据> 正文
登录阅读全文

Spark Streaming和Flink的Word Count对比

简介: 准备: nccat for windows/linux 都可以 通过 TCP 套接字连接,从流数据中创建了一个 Spark DStream/ Flink DataSream, 然后进行处理, 时间窗口大小为10s 因为 示例需要, 所以 需要下载一个netcat, 来构造流的输入。

准备:

nccat for windows/linux 都可以 通过 TCP 套接字连接,从流数据中创建了一个 Spark DStream/ Flink DataSream, 然后进行处理, 时间窗口大小为10s 
因为 示例需要, 所以 需要下载一个netcat, 来构造流的输入。

代码:

spark streaming

package cn.kee.spark;
public final class JavaNetworkWordCount {  
	private static final Pattern SPACE = Pattern.compile(" ");  

	public static void main(String[] args) throws Exception {  
		if (args.length < 2) {  
			System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");  
			System.exit(1);  
		}  
		StreamingExamples.setStreamingLogLevels();  
		SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");  
		JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));  
		JavaReceiverInputDStream<String> lines = ssc.socketTextStream(  
				args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);  
		JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {  
			@Override  
			public Iterator<String> call(String x) {  
				return Arrays.asList(SPACE.split(x)).iterator();  
			}  
		});  
		JavaPairDStream<String, Integer> wordCounts = words.mapToPair(  
				new PairFunction<String, String, Integer>() {  
					@Override  
					public Tuple2<String, Integer> call(String s) {  
						return new Tuple2<>(s, 1);  
					}  
				}).reduceByKey(new Function2<Integer, Integer, Integer>() {  
					@Override  
					public Integer call(Integer i1, Integer i2) {  
						return i1 + i2;  
					}  
				});  
		wordCounts.print();  
		ssc.start();  
		ssc.awaitTermination();  
	}  
}  




Flink DataSream


package cn.kee.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
 * Example :SocketWindowWordCount
 * @author keehang
 *
 */
public class SocketWindowWordCount {

	public static void main(String[] args) throws Exception {

		// the port to connect to
		final int port = 9999;
		/*try {
			final ParameterTool params = ParameterTool.fromArgs(args);
			port = params.getInt("port");
		} catch (Exception e) {
			System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
			return;
		}*/
	
		// get the execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// get input data by connecting to the socket
		DataStream<String> text = env.socketTextStream("localhost", port, "\n");

		// parse the data, group it, window it, and aggregate the counts
		DataStream<WordWithCount> windowCounts = text
				.flatMap(new FlatMapFunction<String, WordWithCount>() {
					@Override
					public void flatMap(String value, Collector<WordWithCount> out) {
						for (String word : value.split("\\s")) {
							out.collect(new WordWithCount(word, 1L));
						}
					}
				})
				.keyBy("word")
				.timeWindow(Time.seconds(5), Time.seconds(1))
				.reduce(new ReduceFunction<WordWithCount>() {
					@Override
					public WordWithCount reduce(WordWithCount a, WordWithCount b) {
						return new WordWithCount(a.word, a.count + b.count);
					}
				});

		// print the results with a single thread, rather than in parallel
		windowCounts.print().setParallelism(1);

		env.execute("Socket Window WordCount");
	}
}																							


结果:





Spark是一种快速、通用的计算集群系统,Spark提出的最主要抽象概念是弹性分布式数据集(RDD),它是一个元素集合,划分到集群的各个节点上,可以被并行操作。用户也可以让Spark保留一个RDD在内存中,使其能在并行操作中被有效的重复使用。

Flink是可扩展的批处理和流式数据处理的数据处理平台,设计思想主要来源于Hadoop、MPP数据库、流式计算系统等,支持增量迭代计算。

总结:Spark和Flink全部都运行在Hadoop YARN上,性能为Flink > Spark > Hadoop(MR),迭代次数越多越明显,性能上,Flink优于Spark和Hadoop最主要的原因是Flink支持增量迭代,具有对迭代自动优化的功能

流式计算比较

它们都支持流式计算,Flink是一行一行处理,而Spark是基于数据片集合(RDD)进行小批量处理,所以Spark在流式处理方面,不可避免增加一些延时。Flink的流式计算跟Storm性能差不多,支持毫秒级计算,而Spark则只能支持秒级计算。

SQL支持

都支持,Spark对SQL的支持比Flink支持的范围要大一些,另外Spark支持对SQL的优化,而Flink支持主要是对API级的优化。


Spark 感觉2.x 后主要在spark sql 这里发展优势,快速Join操作,以及继续扩展sql支持。至于Flink,其对于流式计算和迭代计算支持力度将会更加增强。


版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
大数据
使用钉钉扫一扫加入圈子
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

其他文章