首页> 标签> 流计算
"流计算"
共 10255 条结果
全部 问答 文章 公开课 课程 电子书 技术圈 体验
Storm中Trident框架详解
一、Storm Trident概述及特性Trident是在storm基础上,一个以realtime 计算为目标的高度抽象。 它在提供处理大吞吐量数据能力的同时,也提供了低延时分布式查询和有状态流式处理的能力。好比 Mapreduce框架的Pig,Trident是Storm提供的基于基本API开发的高级框架。Trident提供了 joins, aggregations, grouping, functions, 以及 filters等能力。除此之外,Trident 还提供了一些专门的原语,从而在基于数据库或者其他存储的前提下来应付有状态的递增式 处理。Trident是完全容错的,拥有有且只有一次处理的语义,其实就是transactional的高级封装。这就让你可以很轻松 的使用Trident来进行实时数据处理。Trident会把状态以某种形式保持起来,当有错误发生时,它会根据需要来恢复 这些状态。Trident封装了transactional事务类,所以我们不再需要学习Batch相关的基础API了,减轻了学习成本。trident每次处理消息均以batch为单位,即一次处理多个元组trident是storm的更高层次抽象,主要提供了3个方面的好处:(1)常用的count,sum等封装成了方法,可以直接调用不需要自己实现。(2)提供一次原语,如groupby等。(3)提供事务支持,可以保证数据均处理且只处理了一次(恰好一次)如果我们开发一个对文本中的词频进行统计的程序,使用Storm框架的话我们需要开发三个Storm组件:1.一个Spout负责收集文本信息并分段,做为sentence字段发送给下游的Bolt2.一个Bolt将每段文本分词,将分词结果以word字段发送给下游的Bolt3.一个Bolt对词频进行统计,把统计结果记录在count字段并存储如果使用Trident我们可以使用一下代码完成上述操作:二、TridentTopology与StormToplogy(1)区别StormToplogy:TridentTopology(2)联系spout的消息流涉及到的核心类三、TridentTopology的可靠性机制详解四、Storm Trident API讲解
文章
存储  ·  自然语言处理  ·  分布式计算  ·  API  ·  数据处理  ·  数据库  ·  流计算
2022-05-15
Storm容错机制Acker
(1)基于tuple树的Acker详解Storm中有个特殊的Executor叫acker,他们负责跟踪spout发出的每一个Tuple的Tuple树。当acker发现一个Tuple树 已经处理完成了,它会告诉框架回调Spout的ack(),否则回调Spout的fail()。Acker的跟踪算法是Storm的主要突破之一,对任意大的一个Tuple树,它只需要恒定的20字节就可以进行跟踪。我们期望的是,如果某个Tuple被Bolt执行失败了,则Spout端可以重新发送该Tuple。但很遗憾的是,框架不会自动 重新发送,需要我们自己手工编码实现。(2)如何开启或者关闭Acker(1)如何开启AckerSpout类中emit(java.util.List<java.lang.Object> tuple, java.lang.Object messageId)这个messageId需要是唯一标识(可以用UUID),如此就可以追踪该tuple。 Bolt类里进行collector.ack(tuple),当成功执行tuple后,会自动回调Spout类的ack()函数。要实现ack机制,必须在spout发射tuple的时候指定messageId。并且需要在spout中对tuple进行缓存,对于ack的 tuple则从缓存队列中删除,对于fail的tuple可以选择重发。(2)如何关闭AckerSpout类中emit(java.util.List<java.lang.Object> tuple),无messageId则自动 不追踪。或通过如下关闭Ack:Config conf = new Config() ; conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0); (3)Acker机制的编程模型(4)Storm可靠性实现的设计思想(1)需要实现可靠性的场景什么情况下失败处理的Tuple期望能重发处理?数据完整性要求高,比如银行系统,对电商来讲的话如订单统计。该类数据通常规范很严格,不会出现异常格式的数据,一旦Tuple处理失败,通常是集群、网络等原因。海量日志统计的场景呢?不需要开启Acker。(2)可靠性设计 — Acker失败重发编码实现思路Spout里需要做什么?在Spout emit时添加一个MsgID,那么ack和fail方法将会被调用当Tuple被正确地处理了或发生了错误。collector.emit(new Values("field1", "field2") ,msgID); 缓存emit过的tuple,调用ack()时从缓存里删掉成功执行的tuple;调用fail()时缓存失败处理的tuple及失败次数 ,如 果不大于3次则重发,否则从缓存里删除。Bolt里需要做什么?Bolt里显性回调:this.collector.ack(input);或 this.collector.fail(input);Tuple数据缓存方案:如何缓存Tuple?Spout里进行缓存,本课程将数据缓存到内存。 实际项目中,一定要将数据缓存到外部存储系统,比如比如redis或hbase如何控制重发次数为3次?建一个Map1,缓存发送过的Tuple,成功执行的Tuple从里删除;Map2 存发送失败的Tuple及失败次数,当超过3次 时从Map2里删除。(5)Storm可靠性及Acker失败重发编码实现order.log数据文件0 60.0 2017-03-21 17:33:40 6 334 1 60.0 2017-03-21 17:33:40 6 856 2 20.10 2017-03-21 17:33:40 1 2132 3 60.0 2017-03-21 17:33:40 4 4456 4 80.1 2017-03-21 17:33:40 7 678 5 10.10 2017-03-21 17:33:40 5 2132 6 80.1 2017-03-21 17:33:40 1 678 7 60.0 2017-03-21 17:33:40 6 334 8 80.1 2017-03-21 17:33:40 4 678 9 10.10 2017-03-21 17:33:40 5 798 0 60.0 2017-03-21 17:33:40 6 334 1 60.0 2017-03-21 17:33:40 6 856 2 20.10 2017-03-21 17:33:40 1 2132 3 60.0 2017-03-21 17:33:40 4 4456 4 80.1 2017-03-21 17:33:40 7 678 5 10.10 2017-03-21 17:33:40 5 2132 6 80.1 2017-03-21 17:33:40 1 678 7 60.0 2017-03-21 17:33:40 6 334 8 80.1 2017-03-21 17:33:40 4 678 9 10.10 2017-03-21 17:33:40 5 798 7 60.0 2017-03-21 17:33:40 6 334 8 80.1 2017-03-21 17:33:40 4 678 9 10.10 2017-03-21 17:33:40 5 798 8 80.1 2017-03-21 17:33:40 4 678 9 10.10 2017-03-21 17:33:40 5 798 8 80.1 2017-03-21 17:33:40 4 678 9 10.10 2017-03-21 17:33:40 5 798 8 80.1 2017-03-21 17:33:40 4 678 9 10.10 2017-03-21 17:33:40 5 798 9 10.10 2017-03-21 17:33:40 5 798 9 10.10 2017-03-21 17:33:40 5 798 9 10.10 2017-03-21 17:33:40 5 798 MainTopology:package com.kfk.ackFail; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/29 * @time : 10:28 下午 */ public class MainTopology { public static void main(String[] args) { // 创建Topology TopologyBuilder builder = new TopologyBuilder(); // set Spout builder.setSpout("spout", new AckSpout(), 1); // set Bolt builder.setBolt("bolt1", new AckBolt1(), 1).shuffleGrouping("spout"); builder.setBolt("bolt2", new AckBolt2(), 1).shuffleGrouping("bolt1"); Config conf = new Config(); conf.setDebug(false); try { if (args != null && args.length > 0) { StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("mytopology", conf, builder.createTopology()); } } catch (Exception e){ e.printStackTrace(); } } } AckSpout:package com.kfk.ackFail; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.InputStreamReader; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/29 * @time : 10:36 下午 */ public class AckSpout implements IRichSpout { FileInputStream fileInputStream = null; InputStreamReader inputStreamReader = null; BufferedReader bufferedReader = null; /** * 线程安全的Map,存储emit过的tuple */ private ConcurrentHashMap<Object, Values> pending; /** * 存储失败的tuple和其失败次数 */ private ConcurrentHashMap<Object, Integer> failpending; SpoutOutputCollector spoutOutputCollector = null; String str = null; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.spoutOutputCollector = collector; this.fileInputStream = new FileInputStream("order.log"); this.inputStreamReader = new InputStreamReader(fileInputStream,"UTF-8"); this.bufferedReader = new BufferedReader(inputStreamReader); this.pending = new ConcurrentHashMap<Object, Values>(); this.failpending = new ConcurrentHashMap<Object, Integer>(); } catch (Exception e){ e.printStackTrace(); } } @Override public void close() { // TODO Auto-generated method stub try { bufferedReader.close(); inputStreamReader.close(); fileInputStream.close(); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } @Override public void activate() { // TODO Auto-generated method stub } @Override public void deactivate() { // TODO Auto-generated method stub } @Override public void nextTuple() { try { while ((str = this.bufferedReader.readLine()) != null){ // 过滤动作 UUID msgId = UUID.randomUUID(); String[] arr = str.split("\t"); String date = arr[2].substring(0,10); String orderAmt = arr[1]; Values val = new Values(date,orderAmt); this.pending.put(msgId,val); spoutOutputCollector.emit(val,msgId); System.out.println("pending.size()="+pending.size()); } } catch (Exception e){ // TODO: handle exception e.printStackTrace(); } } @Override public void ack(Object msgId) { // TODO Auto-generated method stub System.out.println("pending size 共有:"+pending.size()); System.out.println("spout ack:"+msgId.toString()+"---"+msgId.getClass()); this.pending.remove(msgId); System.out.println("pending size 剩余:"+pending.size()); } @Override public void fail(Object msgId) { // TODO Auto-generated method stub System.out.println("spout fail:"+msgId.toString()); // 获取该Tuple失败的次数 Integer failcount = failpending.get(msgId); if (failcount == null){ failcount = 0; } failcount++; if (failcount >= 3){ // 重试次数已满,不再进行重新emit failpending.remove(msgId); } else { // 记录该tuple失败次数 failpending.put(msgId, failcount); // 重发 this.spoutOutputCollector.emit(this.pending.get(msgId), msgId); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("date","orderAmt")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } } AckBolt1:package com.kfk.ackFail; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/29 * @time : 11:00 下午 */ public class AckBolt1 implements IRichBolt { OutputCollector outputCollector = null; TopologyContext topologyContext = null; int num = 0; String url = null; String session_id = null; String date = null; String province_id = null; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { // TODO Auto-generated method this.outputCollector = collector; this.topologyContext = context; } @Override public void execute(Tuple input) { try { date = input.getStringByField("date") ; Double orderAmt = Double.parseDouble(input.getStringByField("orderAmt")); // 注意参数,第一个参数是Tuple本身 outputCollector.emit(input,new Values(date,String.valueOf(orderAmt))); outputCollector.ack(input); } catch (Exception e){ outputCollector.fail(input); e.printStackTrace(); } } @Override public void cleanup() { // TODO Auto-generated method stub } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("date","orderAmt")) ; } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } } AckBolt2:package com.kfk.ackFail; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import java.util.Map; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/29 * @time : 11:28 下午 */ public class AckBolt2 implements IRichBolt { OutputCollector outputCollector = null; TopologyContext topologyContext = null; int num = 0; String url = null; String session_id = null; String date = null; String province_id = null; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { // TODO Auto-generated method this.outputCollector = collector; this.topologyContext = context; } @Override public void execute(Tuple input) { try { date = input.getStringByField("date") ; Double orderAmt = Double.parseDouble(input.getStringByField("orderAmt")); outputCollector.ack(input); } catch (Exception e){ outputCollector.fail(input); e.printStackTrace(); } } @Override public void cleanup() { // TODO Auto-generated method stub } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("")) ; } @Override public Map<String, Object> getComponentConfiguration() { return null; } } 运行结果:
文章
缓存  ·  算法  ·  NoSQL  ·  分布式数据库  ·  Redis  ·  流计算  ·  Hbase
2022-05-15
Storm时间窗口
(1)时间窗口概念什么是时间窗口?按一定的时间或者tuple数量来定义打包处理的大小称之为时间窗口。使用场景:计算每隔10分钟的销售额。计算上1分钟成交金额计算近1个小时最火爆的微博等。Storm的时间窗口机制Storm可同时处理窗口内的所有tuple。窗口可以从时间或数量上来划分,就是说可以根据时间段或 Tuple数量来定窗口大小。有如下两种时间窗口:滚动窗口 Tumbling Window滑动窗口 Sliding Window注意:时间窗口的计算,通常都是单并发(Executer)。(1)滚动窗口 Tumbling Window按照固定的时间间隔或者Tuple数量划分窗口。 每个Touple仅处在一个窗口里,每个窗口之间无交互。在这里插入代码片........| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |... -5 0 5 10 15 -> time |<------- w1 -->| |<---------- w2 ----->| |<-------------- w3 ---->| 因素:窗口大小,时间或Tuple数据,通过参数设定。 “步长”就是窗口大小。BaseWindowedBolt .withTumblingWindow(Duration.of(毫秒数)或者 Count.of(10)) (2)滑动窗口 Tumbling Window按照固定的时间间隔或者Tuple数量划分窗口。 每个Touple仅处在一个窗口里,每个窗口之间无交互。| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |... 0 5 10 15 -> time w1 w2 w3 因素:窗口大小,时间或Tuple数据,通过参数设定。 “步长”就是窗口大小。BaseWindowedBolt .withTumblingWindow(Duration.of(毫秒数)或者 Count.of(10)) 该窗口每5秒评估一次,第一个窗口中的一些元组与第二个窗口重叠。数据以步长为增量,按照窗口大小从右到左来确定要处理的数据量,并 去掉最左末尾数据。(2)滚动窗口程序开发及测试MainTopology:package com.kfk.window; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseWindowedBolt; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/29 * @time : 11:58 上午 */ public class MainTopology { public static void main(String[] args) { // 创建Topology TopologyBuilder builder = new TopologyBuilder(); // set Spout builder.setSpout("spout",new AmtSpout()); // set 滚动窗口 Tumbling Window builder.setBolt("amtBolt",new TumblingBolt() .withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)),1) .shuffleGrouping("spout"); // 设置日志等级 Config conf = new Config(); conf.setDebug(false); try { if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("filePrint", conf, builder.createTopology()); } }catch (Exception e){ e.printStackTrace(); } } } AmtSpout:package com.kfk.window; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.Map; import java.util.Random; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/29 * @time : 12:24 下午 */ public class AmtSpout implements IRichSpout { Integer[] amt = {10,20,30,40}; String[] time = {"2020-12-27 12:43","2020-12-25 12:43","2020-12-23 12:43","2020-12-18 12:43"}; String[] city = {"beijing","nanjing","shenzhen","shanghai","guangzhou"}; String[] product = {"java","python","c","scala"}; Random random = new Random(); SpoutOutputCollector spoutOutputCollector = null; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { spoutOutputCollector = collector; } catch (Exception e) { e.printStackTrace(); } } @Override public void close() { } @Override public void activate() { } @Override public void deactivate() { } @Override public void nextTuple() { try { // 模拟数据 int _amt = amt[random.nextInt(4)]; String _time = time[random.nextInt(4)]; String _city = city[random.nextInt(5)]; String _product = product[random.nextInt(4)]; // emit给Bolt节点 spoutOutputCollector.emit(new Values(String.valueOf(_amt),_time,_city,_product)); Thread.sleep(1000); } catch (Exception e){ e.printStackTrace(); } } @Override public void ack(Object msgId) { } @Override public void fail(Object msgId) { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // set Fields declarer.declare(new Fields("amt","time","city","product")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } } TumblingBolt:package com.kfk.window; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseWindowedBolt; import org.apache.storm.tuple.Tuple; import org.apache.storm.windowing.TupleWindow; import java.util.List; import java.util.Map; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/29 * @time : 12:09 下午 */ public class TumblingBolt extends BaseWindowedBolt { @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { super.prepare(stormConf, context, collector); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { super.declareOutputFields(declarer); } @Override public void execute(TupleWindow inputWindow) { List<Tuple> list = inputWindow.get(); int amt = 0; for (Tuple tuple:list){ amt += Integer.parseInt(tuple.getStringByField("amt")); } System.out.println(amt); } } (2)滑动窗口程序开发及测试MainTopology:package com.kfk.window; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseWindowedBolt; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/29 * @time : 11:58 上午 */ public class MainTopology { public static void main(String[] args) { // 创建Topology TopologyBuilder builder = new TopologyBuilder(); // set Spout builder.setSpout("spout",new AmtSpout()); // set 滑动窗口 Sliding Window builder.setBolt("amtBolt",new SliderBolt() .withWindow(BaseWindowedBolt.Duration.seconds(5), BaseWindowedBolt.Duration.seconds(2)),1) .shuffleGrouping("spout"); // 设置日志等级 Config conf = new Config(); conf.setDebug(false); try { if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("filePrint", conf, builder.createTopology()); } }catch (Exception e){ e.printStackTrace(); } } } SliderBolt:package com.kfk.window; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseWindowedBolt; import org.apache.storm.tuple.Tuple; import org.apache.storm.windowing.TupleWindow; import java.util.List; import java.util.Map; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/29 * @time : 12:18 下午 */ public class SliderBolt extends BaseWindowedBolt { int amt = 0; OutputCollector outputCollector = null; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { outputCollector = collector; super.prepare(stormConf, context, collector); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { super.declareOutputFields(declarer); } @Override public void execute(TupleWindow inputWindow) { List<Tuple> listNew = inputWindow.getNew(); List<Tuple> listExp = inputWindow.getExpired(); for (Tuple tuple : listNew){ amt += Integer.parseInt(tuple.getStringByField("amt")); } for (Tuple tuple : listExp){ int expAmt= Integer.parseInt(tuple.getStringByField("amt")); amt -=expAmt; } System.out.println("近3秒订单:"+amt); } }
文章
流计算
2022-05-15
Storm拓扑并行度与及流分组策略
一、Storm拓扑的并行度(parallelism)介绍(1)运行拓扑的结构工作进程: Worker Process,也称为Worker执行器: Executor,即线程Thread任务: Task工作进程、执行器、任务三者之间关系如下图:Topology由一个或多个Spout/Bolt组件构成。运行中的Topology由一个或多个Supervisor节点中的Worker构成。默认情况下一个Supervisor节点运行4个Worker,由defaults.yaml/storm.yaml中的属性决定:supervisor.slots.ports:     - 6700     - 6701     - 6702     - 6703 在代码中可以使用new Config().setNumWorkers(3),最大数量不能超过配置的supervisor.slots.ports数量。Executor为特定拓扑的一个或多个组件Spout/Bolt实例运行一个或多个Task。默认情况下一个Executor运行一个Task。Task执行真正的数据处理,代码中实现的每个Spout/bolt作为很多任务跨集群执行。一个Spout/Bolt组件的Task数量始终贯穿Topology的整个生命周期,但一个Spout/Bolt组件的Executor数量会随着时间而改变。这意味着Threads≤Tasks条件成立。默认情况下Task数量与Executor数量相同,即Storm会使用每个Executor运行一个Task。(2)配置拓扑的并行度这里所说的术语“并行度”主要是用于表示所谓的 parallelism_hint,它代表着一个组件的初始 executor (也是线程)数量。在这篇文章里,我们使用这个“并行度”术语来说明在 Storm 拓扑中既可以配置 executor 的数量,也可以配置 worker 和 task 的数量。如果“并行度”的概念需要表示其他的一般情况,我们也会特别指出。下面的内容里显示了很多可配置选项,以及在代码中配置他们的方法。可以用于配置的方法有很多种,这里列出的只是其中一部分。另外需要注意的是:Storm 的配置优先级为 defaults.yaml < storm.yaml < 拓扑配置 < 内置型组件信息配置 < 外置型组件信息配置。(1)工作进程Worker数量说明:拓扑在集群中运行所需要的工作进程数配置选项:TOPOLOGY_WORKERS在代码中使用Config config = new Config(); //注意此参数不能大于supervisor.slots.ports数量。 config.setNumWorkers(3); (2)执行器Executor数量说明:每个组件需要的执行线程数配置选项:(没有拓扑级的通用配置项)在代码中使用TopologyBuilder builder = new TopologyBuilder(); //设置Spout的Executor数量参数parallelism_hint builder.setSpout(id, spout, parallelism_hint); //设置Bolt的Executor数量参数parallelism_hint builder.setBolt(id, bolt, parallelism_hint); (3)任务Task数量说明:每个组件需要的执行任务数配置选项:TOPOLOGY_TASKS在代码中使用TopologyBuilder builder = new TopologyBuilder(); //设置Spout的Executor数量参数parallelism_hint,Task数量参数val builder.setSpout(id, spout, parallelism_hint).setNumTasks(val); //设置Bolt的Executor数量参数parallelism_hint,Task数量参数val builder.setBolt(id, bolt, parallelism_hint).setNumTasks(val); (3)改变运行中拓扑的并行度Storm一个很好的特性是可以增加或减少工作进程Worker和Executor的数量而不需要重启集群或拓扑,这样的行为成为再平衡(rebalancing)。目前有两种方式可实现拓扑再平衡,如下:使用Storm的WebUI使用Storm的命令行工具,如下# 重新配置拓扑 # “myTopology” 拓扑使用5个Worker进程 # “blue-spout” Spout使用3个Executor # “yellow-blot” Bolt使用10个Executor 二、Streaming Groupings流分组策略数据从上游节点发送到下游节点时,当下游节点的并发度大于1时,我们对下 游节基于多并发情况下接受并处理数据的策略称之为分组策略。stream grouping就是用来定义一个stream应该如何分配给Bolts上面的多个并发。掌握Shuffle Grouping和 Fields Grouping 即可。storm里面有6种类型的stream grouping。1.ShuffleGrouping: 用在非聚合计算,比如过滤、写库等功能性操作 随机派发stream里面的tuple,尽量保证每个bolt并发接收到的tuple数目相同,但不严格相同。0.10之前,shuffleGrouping是轮询分配,即每个bolt得到的数据量相同。2.FieldsGrouping: 按Field分组进行聚合场景,比如按word来分组, 具有同样word会被分到相同的Bolt。作用:过滤,从源端(Spout或上一级Bolt)多输出Fields中选择某些Field相同的tuple会分发给同一个Executer或task处理典型场景: 去重操作、Join2.Non Grouping: 无分组, 这种分组和Shuffle grouping是一样的效果,多线程下不平均分配。4.All Grouping: 广播发送, 对于每一个tuple, 所有的Bolts都会收到。5.Global Grouping: 全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。6.Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者决定由消息接收者的哪个task处理这 个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理 者可以通过TopologyContext来或者处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)通过Mapreduce来对比FiledsGroupingSQL:select count(1), word from tableName group by word; MR:<key, value> —— <key, values> 把相同的key进行了聚合 shuffle混淆以后,相同的key发送到同一个reduce进程(线程)里,才能确保该key进行全局聚合。数据倾斜根本原因:有的key的value少,有的多,两级分化严重。Storm:需要确保相同的key(tuple)必须发送给同一个bolt进程(线程),用fiedsGrouping来实现。三、基于流分组策略应用程序的开发案例需求:汇总每天的订单的交易量MainTopology:package com.kfk.pro1Grouping; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/28 * @time : 4:47 下午 */ public class MainTopology { public static void main(String[] args) { // 创建Topology TopologyBuilder builder = new TopologyBuilder(); // set Spout builder.setSpout("spout",new AmtSpout()); // set Bolt,按Field(time)分组,并设置并行度,是这个组件有几个executor来执行 builder.setBolt("amtBolt",new AmtBolt(),4).fieldsGrouping("spout",new Fields("time")); builder.setBolt("printBolt",new PrintBolt(),2).shuffleGrouping("amtBolt"); // 设置日志等级 Config conf = new Config(); conf.setDebug(false); try { // 本地模式运行 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("amtTopo", conf, builder.createTopology()); } catch (Exception e){ e.printStackTrace(); } } } AmtSpout:package com.kfk.pro1Grouping; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.Map; import java.util.Random; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/28 * @time : 4:52 下午 */ public class AmtSpout implements IRichSpout { Integer[] amt = {10,20,30,40}; String[] time = {"2020-12-27 12:43","2020-12-25 12:43","2020-12-23 12:43","2020-12-18 12:43"}; String[] city = {"beijing","nanjing","shenzhen","shanghai","guangzhou"}; String[] product = {"java","python","c","scala"}; SpoutOutputCollector spoutOutputCollector = null; Random random = new Random(); @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { spoutOutputCollector = collector; } @Override public void close() { } @Override public void activate() { } @Override public void deactivate() { } @Override public void nextTuple() { // 模拟数据 int _amt = amt[random.nextInt(4)]; String _time = time[random.nextInt(4)]; String _city = city[random.nextInt(5)]; String _product = product[random.nextInt(4)]; // emit给Bolt节点 spoutOutputCollector.emit(new Values(String.valueOf(_amt),_time,_city,_product)); } @Override public void ack(Object msgId) { } @Override public void fail(Object msgId) { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // set Fields declarer.declare(new Fields("amt","time","city","product")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } } AmtBolt:package com.kfk.pro1Grouping; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.HashMap; import java.util.Map; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/28 * @time : 4:50 下午 */ public class AmtBolt implements IRichBolt { Map<String,Integer> amtMap = new HashMap<String,Integer>(); OutputCollector outputCollector = null; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { outputCollector = collector; } @Override public void execute(Tuple input) { // Bolt业务逻辑处理 String time = input.getStringByField("time"); int amt = Integer.parseInt(input.getStringByField("amt")); // 累加amt次数 if (amtMap.get(time) != null){ amt += amtMap.get(time); } amtMap.put(time,amt); // emit给Bolt节点 outputCollector.emit(new Values(amtMap)); } @Override public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("res")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } } PrintBolt:package com.kfk.pro1Grouping; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; import java.util.Map; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/28 * @time : 4:51 下午 */ public class PrintBolt implements IRichBolt { @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { } @Override public void execute(Tuple input) { System.out.println(input.getValue(0)); } @Override public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public Map<String, Object> getComponentConfiguration() { return null; } } 运行结果:{2020-12-27 12:43=43471960, 2020-12-23 12:43=43497930} {2020-12-27 12:43=43472120, 2020-12-23 12:43=43498060} {2020-12-18 12:43=43479050, 2020-12-25 12:43=43458280} {2020-12-18 12:43=43480060, 2020-12-25 12:43=43459440} {2020-12-18 12:43=43480120, 2020-12-25 12:43=43459470} {2020-12-18 12:43=43480280, 2020-12-25 12:43=43459610} ...
文章
SQL  ·  分布式计算  ·  数据处理  ·  流计算
2022-05-15
Storm架构原理详解!
一、Storm概述Apache Storm是一个免费的开源分布式实时计算系统。 Storm可以轻松地处理无限数据流, 并且实时处理Hadoop批处理任务。 可以和任何编程语言一起使用,并且使用起来非常有趣!Storm有许多用例:实时分析,在线机器学习,连续计算,分布式RPC,ETL等等。 Storm是快速 的:一个基准计算每个节点每秒处理超过一百万个元组。 它具有可扩展性,容错性,可确保您的 数据得到处理,且易于设置和操作。Storm集成了您已经使用的队列和数据库技术。 Storm拓扑消耗数据流,并以任意复杂的方式处理 这些流,然后重新分配计算的每个阶段之间的流。二、Storm特性Storm特点:开源分布式流式计算无顺序流数据计算到达海量数据无穷无尽计算速度快扩展性、容错性、可靠性、高可用性、易用性storm应用场景:推荐系统(实时推荐,根据下单或加入购物车推荐相关商品)、金融系统、预警系统、网站统计(实时销量、流量统计,如淘宝双11效果图)、交通路况实时系统等等。1.流数据处理,Storm可以用来处理源源不断流进来的消息,处理之后将结果写入到某个存储中去。2.分布式rpc,由于storm的处理组件是分布式的,而且处理延迟极低,所以可以作为一个通用的分布式rpc框架来使用。3.持续计算,任务一次初始化,一直运行,除非你手动kill它。三、Storm架构原理与Hadoop主从架构一样,Storm也采用Master/Slave体系结构,分布式计算由Nimbus和Supervisor两类服务进程实现,Nimbus进程运行在集群的主节点,负责任务的指派和分发,Supervisor运行在集群的从节点,负责执行任务的具体部分。架构图如下:如图所示:Nimbus: Storm集群的Master节点,负责资源分配和任务调度,负责分发用户代码,指派给具体的Supervisor节点上的Worker节点,去运行Topology对应的组件(Spout/Bolt)的Task。Supervisor: Storm集群的从节点,负责接受Nimbus分配的任务,启动和停止属于自己管理的worker进程。通过Storm的配置文件中的supervisor.slots.ports配置项,可以指定在一个Supervisor上最大允许多少个Slot,每个Slot通过端口号来唯一标识,一个端口号对应一个Worker进程(如果该Worker进程被启动)。Worker: 负责运行具体处理组件逻辑的进程。Worker运行的任务类型只有两种,一种是Spout任务,一种是Bolt任务。Task: worker中每一个spout/bolt的线程称为一个task。同一个spout/bolt的task可能会共享一个物理线程,该线程称为executor。ZooKeeper: 用来协调Nimbus和Supervisor,如果Supervisor因故障出现问题而无法运行Topology,Nimbus会第一时间感知到,并重新分配Topology到其它可用的Supervisor上运行。四、Storm服务组件(1)Topology(拓扑)实时应用程序的逻辑被封装在 Storm topology(拓扑)中。Storm topology(拓扑)类似于 MapReduce 作业。两者之间关键的区别是 MapReduce 作业最终会完成, 而 topology(拓扑)任务会永远运行(除 非 kill 掉它)。一个拓扑是 Spout 和 Bolt 通过 stream groupings连接起 来的有向无环图。Stream 是 Storm 中的核心概念.一个 stream 是一个无界的、以分布式方式并行创建和处理的 Tuple 序列. stream 以一个 schema 来定义, 这个 schema 用来命名 stream tuple(元组)中的字段.默认情况下 Tuple 可以包含 integers, longs, shorts, bytes, strings, doubles, floats, booleans, and byte arrays 等数据类型.你也可以定义自己的 serializers, 以至于可以在 Tuple 中使用自定义的类型.每一个流在声明的时候会赋予一个 ID. 由于只包含一个 stream 的 Spout 和 Bolt 比较常见, OutputFieldsDeclarer 有更方便的方法可以定义一个单一的 stream 而不用指定ID. 这个 stream 被赋予一个默认的 ID, “default”.Topology模型图:其中包含:Spout: Storm中的消息源,用于为Topology生产消息(数据),一般是从外部数据源(如MessageQueue、RDBMS、NoSQL、Realtime Log )不间断地读取数据并发送给Topology消息(tuple元组)。Bolt: Storm中的消息处理者,用于为Topology进行消息的处理,Bolt可以执行过滤,聚合, 查询数据库等操作,而且可以一级一级的进行处理。(2) 数据模型Tuplestorm使用tuple来作为它的数据模型。每个tuple是一堆值,每个值有一个名字,并且每个值可以是任何类型,在我的理解里面一个tuple可以看作一个java对象。总体来看,storm支持所有的基本类型:字符串以及字节数组作为tuple的值类型。你也可以使用你自己定义的类型来作为值类型,只要你实现对应的序列化器(serializer)。一个Tuple代表数据流中的一个基本的处理单元,它可以包含多个Field,每个Field表示一个属性。Tuple是一个Key-Value的Map,由于各个组件间传递的tuple的字段名称已经事先定义好了,所以Tuple只需要按序填入各个Value,所以就是一个Value List。一个没有边界的,源源不断的,连续的Tuple序列就组成了Stream。topology里面的每个节点必须定义它要发射的tuple的每个字段。(3)SpoutSpout 是一个 topology(拓扑)中 streams 的源头. 通常 Spout 会从外部数据源读取 Tuple,然后把他们发送到 拓扑中(如 Kestel 队列, 或者 Twitter API). Spout 可以是 可靠的 或 不可靠的. 可靠的 Spout 在 Storm 处理失败的时 候能够重放 Tuple, 不可靠的 Spout 一旦把一个 Tuple 发送出去就撒手不管了.Spout 中的最主要的方法是 nextTuple. nextTuple 要么向 topology(拓扑)中发送一个新的 Tuple, 要么在没有 Tuple 需要发送的情况下直接返回. 对于任何 Spout 实现, nextTuple 方法都必须非阻塞的, 因为 Storm 在一个线程中调 用所有的 Spout 方法.Spout 的另外几个重要的方法是 ack 和 fail. 这些方法在 Storm 检测到 Spout 发送出去的 Tuple 被成功处理或者 处理失败的时候调用. ack和fail只会在可靠的 Spout 中调用(4)Bolt拓扑中所有的业务处理都在 Bolts 中完成. Bolt 可以做很多事情,过滤, 函数, 聚合, 关联, 与数据库交互等.拓扑中所有的业务处理都在 Bolts 中完成. Bolt 可以做很多事情,过滤, 函数, 聚合, 关联, 与数据库交互等.Bolt 中最主要的方法是 execute 方法, 当有一个新 Tuple 输入的时候会进入这个方法. Bolt 使用OutputCollector 对象发送 新的 Tuple. Bolt 必须在每一个 Tuple 处理完以后调用 OutputCollector 上的 ack 方法, Storm 就会知道 tuple 什么时候完成 ( 最终可以确定 调用源 Spout Tuple 是没有问题的). 当处理一个输入的 Tuple:会基于这个 Tuple 产生零个或者多个 Tuple 发 送出去,当所有的tuple 完成后,会调用 acking. Storm 提供了 IBasicBolt 接口会自动执行 acking .最好在 Bolt 中启动新的线程异步处理 tuples. OutputCollector 是线程安全的, 并且可以在任何时刻调用.(5)并行元素(Worker、Executor、Task)的关系一个Storm在集群上运行一个Topology时,主要通过以下3个实体来完成Topology的执行工作:Worker(进程)Executor(线程)Task下图简要描述了这3者之间的关系:1个worker进程执行的是1个topology的子集(注:不会出现1个worker为多个topology服务)。1个worker进程会启动1个或多个executor线程来执行1个topology的component(spout或bolt)。因此,1个运行中的topology就是由集群中多台物理机上的多个worker进程组成的。executor是1个被worker进程启动的单独线程。每个executor只会运行1个topology的1个component(spout或bolt)的task(注:task可以是1个或多个,storm默认是1个component只生成1个task,executor线程里会在每次循环里顺序调用所有task实例)。一个 task 执行实际的数据处理 - 在您代码中实现的每个 spout 或 bolt 在整个集群上都执行了许多的 task(任 务), 组件的 task(任务)数量在 topology(拓扑)的整个生命周期中总是相同的, 但组件的 executors(线程) 数量可能会随时间而变化。 这意味着以下条件成立: #threads ≤ #tasks. 默认情况下,tasks(任务)数量与 executors(执行器)设置成一样,即1个executor线程只运行1个task。六、Storm工作原理Nimbus 负责在集群分发的代码,topo只能在nimbus机器上提交,将任务分配给其他机器,和故障监测。Supervisor,监听分配给它的节点,根据Nimbus 的委派在必要时启动和关闭工作进程。 每个工作进程执行topology 的一个子集。一个运行中的topology 由很多运行在很多机器上的工作进程组成。在Storm中有对于流stream的抽象,流是一个不间断的无界的连续tuple,注意Storm在建模事件流时,把流中的事件抽象为tuple即元组Storm认为每个stream都有一个源,也就是原始元组的源头,叫做Spout(管口)处理stream内的tuple,抽象为Bolt,bolt可以消费任意数量的输入流,只要将流方向导向该bolt,同时它也可以发送新的流给其他bolt使用,这样一来,只要打开特定的spout再将spout中流出的tuple导向特定的bolt,bolt又对导入的流做处理后再导向其他bolt或者目的地。可以认为spout就是水龙头,并且每个水龙头里流出的水是不同的,我们想拿到哪种水就拧开哪个水龙头,然后使用管道将水龙头的水导向到一个水处理器(bolt),水处理器处理后再使用管道导向另一个处理器或者存入容器中。为了增大水处理效率,我们很自然就想到在同个水源处接上多个水龙头并使用多个水处理器,这样就可以提高效率。七、Storm与Hadoop的对比
文章
分布式计算  ·  搜索推荐  ·  NoSQL  ·  Hadoop  ·  测试技术  ·  数据处理  ·  调度  ·  数据库  ·  流计算  ·  容器
2022-05-15
Structured Streaming之Event-Time的Window操作
基于Event-Time的Window操作详细工作流程:方式一:package com.kfk.spark.structuredstreaming; import com.kfk.spark.common.CommSparkSession; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQueryException; import scala.Tuple2; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Iterator; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/22 * @time : 9:44 下午 */ public class EventTimeWindow { public static void main(String[] args) throws StreamingQueryException { SparkSession spark = CommSparkSession.getSparkSession(); // input table Dataset<Row> dflines = spark.readStream() .format("socket") .option("host", "bigdata-pro-m04") .option("port", 9999) .option("includeTimestamp",true) .load(); /** * 输入数据:spark storm hive * 接受到数据模型: * 第一步:words -> spark storm hive, timestamp -> 2019-09-09 12:12:12 * eg: tuple -> (spark storm hive , 2019-09-09 12:12:12) * * 第二步:split()操作 * 对tuple中的key进行split操作 * * 第三步:flatMap()操作 * 返回类型 -> list(tuple(spark,2019-09-09 12:12:12), * tuple(storm,2019-09-09 12:12:12), * tuple(hive,2019-09-09 12:12:12)) */ Dataset<Row> words = dflines.as(Encoders.tuple(Encoders.STRING(),Encoders.TIMESTAMP())) .flatMap(new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() { @Override public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> stringTimestampTuple2) throws Exception { List<Tuple2<String, Timestamp>> result = new ArrayList<>(); for (String word : stringTimestampTuple2._1.split(" ")){ result.add(new Tuple2<>(word,stringTimestampTuple2._2)); } return result.iterator(); } },Encoders.tuple(Encoders.STRING(),Encoders.TIMESTAMP())).toDF("word","wordtime"); // result table Dataset<Row> windowedCounts = words.groupBy( functions.window(words.col("wordtime"), "10 minutes", "5 minutes"), words.col("word") ).count(); StreamingQuery query = windowedCounts.writeStream() .outputMode("update") .format("console") .option("truncate","false") .start(); query.awaitTermination(); } } 方式二:package com.kfk.spark.structuredstreaming; import java.sql.Timestamp; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/22 * @time : 9:25 下午 */ public class EventData { //timestamp: Timestamp, //word: String private Timestamp wordtime; private String word; public Timestamp getWordtime() { return wordtime; } public void setWordtime(Timestamp wordtime) { this.wordtime = wordtime; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public EventData() { } public EventData(Timestamp wordtime, String word) { this.wordtime = wordtime; this.word = word; } } package com.kfk.spark.structuredstreaming; import com.kfk.spark.common.CommSparkSession; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQueryException; import java.sql.Timestamp; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/22 * @time : 9:44 下午 */ public class EventTimeWindow2 { public static void main(String[] args) throws StreamingQueryException { SparkSession spark = CommSparkSession.getSparkSession(); // input table Dataset<String> dflines = spark.readStream() .format("socket") .option("host", "bigdata-pro-m04") .option("port", 9999) .load().as(Encoders.STRING()); /** * 数据源:2019-03-29 16:40:00,hadoop * 2019-03-29 16:40:10,storm * * // 根据javaBean转换为dataset */ Dataset<EventData> dfeventdata = dflines.map(new MapFunction<String, EventData>() { @Override public EventData call(String value) throws Exception { String[] lines = value.split(","); return new EventData(Timestamp.valueOf(lines[0]),lines[1]); } }, ExpressionEncoder.javaBean(EventData.class)); // result table Dataset<Row> windowedCounts = dfeventdata.groupBy( functions.window(dfeventdata.col("wordtime"), "10 minutes", "5 minutes"), dfeventdata.col("word") ).count(); StreamingQuery query = windowedCounts.writeStream() .outputMode("update") .format("console") .option("truncate","false") .start(); query.awaitTermination(); } }
文章
流计算
2022-05-15
Structured Streaming中DS和DF的操作详解
创建流式的DataSet和DataFrame方式一:通过JavaBean方式Java语言实现:package com.kfk.spark.structuredstreaming; import java.sql.Date; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/21 * @time : 10:38 下午 */ public class DeviceData { // device: string, type: string, signal: double, time: DateType private String device; private String deviceType; private double signal; private Date deviceTime; public String getDevice() { return device; } public void setDevice(String device) { this.device = device; } public String getDeviceType() { return deviceType; } public void setDeviceType(String deviceType) { this.deviceType = deviceType; } public double getSignal() { return signal; } public void setSignal(double signal) { this.signal = signal; } public Date getDeviceTime() { return deviceTime; } public void setDeviceTime(Date deviceTime) { this.deviceTime = deviceTime; } public DeviceData(String device, String deviceType, double signal, Date deviceTime) { this.device = device; this.deviceType = deviceType; this.signal = signal; this.deviceTime = deviceTime; } public DeviceData(){ } } package com.kfk.spark.structuredstreaming; import com.kfk.spark.common.CommSparkSession; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQueryException; import java.sql.Date; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/21 * @time : 10:35 下午 */ public class StruStreamingDFOper { public static void main(String[] args) throws StreamingQueryException { SparkSession spark = CommSparkSession.getSparkSession(); // input table Dataset<String> dflines = spark.readStream() .format("socket") .option("host", "bigdata-pro-m04") .option("port", 9999) .load().as(Encoders.STRING()); // 根据javaBean转换为dataset Dataset<DeviceData> dfdevice = dflines.map(new MapFunction<String, DeviceData>() { @Override public DeviceData call(String value) throws Exception { String[] lines = value.split(","); return new DeviceData(lines[0],lines[1],Double.parseDouble(lines[2]), Date.valueOf(lines[3])); } }, ExpressionEncoder.javaBean(DeviceData.class)); // result table Dataset<Row> dffinal = dfdevice.select("device","deviceType").where("signal > 10").groupBy("deviceType").count(); // output StreamingQuery query = dffinal.writeStream() .outputMode("update") .format("console") .start(); query.awaitTermination(); } } 方式二:构造Schema的方式package com.kfk.spark.structuredstreaming; import com.kfk.spark.common.CommSparkSession; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.catalyst.encoders.RowEncoder; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQueryException; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.sql.Date; import java.util.ArrayList; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/21 * @time : 10:35 下午 */ public class StruStreamingDFOper2 { public static void main(String[] args) throws StreamingQueryException { SparkSession spark = CommSparkSession.getSparkSession(); // input table Dataset<String> dflines = spark.readStream() .format("socket") .option("host", "bigdata-pro-m04") .option("port", 9999) .load().as(Encoders.STRING()); List<StructField> fields = new ArrayList<StructField>(); StructField device = DataTypes.createStructField("device",DataTypes.StringType,true); StructField deviceType = DataTypes.createStructField("deviceType",DataTypes.StringType,true); StructField signal = DataTypes.createStructField("signal",DataTypes.DoubleType,true); StructField deviceTime = DataTypes.createStructField("deviceTime",DataTypes.DateType,true); fields.add(device); fields.add(deviceType); fields.add(signal); fields.add(deviceTime); // 构造Schema StructType scheme = DataTypes.createStructType(fields); // 根据schema转换为dataset Dataset<Row> dfdevice = dflines.map(new MapFunction<String, Row>() { @Override public Row call(String value) throws Exception { String[] lines = value.split(","); return RowFactory.create(lines[0],lines[1],Double.parseDouble(lines[2]), Date.valueOf(lines[3])); } }, RowEncoder.apply(scheme)); // result table Dataset<Row> dffinal = dfdevice.select("device","deviceType").where("signal > 10").groupBy("deviceType").count(); // output StreamingQuery query = dffinal.writeStream() .outputMode("update") .format("console") .start(); query.awaitTermination(); } } Scala语言实现:package com.kfk.spark.structuredstreaming import com.kfk.spark.common.CommSparkSessionScala import java.sql.Date import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.types.StructType; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/22 * @time : 7:31 下午 */ object StruStreamingDFOperScala { case class DeviceData(device : String,deviceType : String , signal : Double ,deviceTime: Date) def main(args: Array[String]): Unit = { val spark = CommSparkSessionScala.getSparkSession() ; import spark.implicits._; // input table val socketDF = spark .readStream .format("socket") .option("host", "bigdata-pro-m04") .option("port", 9999) .load().as[String] // 构造Schema val userSchema = new StructType() .add("device", "string") .add("deviceType", "string") .add("signal", "double") .add("deviceTime", "date") // 根据schema转换为dataset val df_device = socketDF.map(x => { val lines = x.split(",") Row(lines(0),lines(1),lines(2),lines(3)) })(RowEncoder(userSchema)) // 根据case class转换为dataset val df : Dataset[DeviceData] = df_device.as[DeviceData] // result table val df_final = df.groupBy("deviceType").count() // output val query = df_final.writeStream .outputMode("update") .format("console") .start() query.awaitTermination() } } 测试数据:aa,ss,100,2020-12-21 bb,ss,47,2020-12-21 cc,ss,3,2020-12-21 dd,ss,46,2020-12-21 运行结果:------------------------------------------- Batch: 1 ------------------------------------------- +----------+-----+ |deviceType|count| +----------+-----+ | ss| 3| +----------+-----+ 创建流式的DataSet和DataFrame也可以通过创建临时表来进行业务分析package com.kfk.spark.structuredstreaming; import com.kfk.spark.common.CommSparkSession; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.catalyst.encoders.RowEncoder; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQueryException; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.sql.Date; import java.util.ArrayList; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/21 * @time : 10:35 下午 */ public class StruStreamingDFOper3 { public static void main(String[] args) throws StreamingQueryException { SparkSession spark = CommSparkSession.getSparkSession(); // input table Dataset<String> dflines = spark.readStream() .format("socket") .option("host", "bigdata-pro-m04") .option("port", 9999) .load().as(Encoders.STRING()); List<StructField> fields = new ArrayList<StructField>(); StructField device = DataTypes.createStructField("device",DataTypes.StringType,true); StructField deviceType = DataTypes.createStructField("deviceType",DataTypes.StringType,true); StructField signal = DataTypes.createStructField("signal",DataTypes.DoubleType,true); StructField deviceTime = DataTypes.createStructField("deviceTime",DataTypes.DateType,true); fields.add(device); fields.add(deviceType); fields.add(signal); fields.add(deviceTime); // 构造Schema StructType scheme = DataTypes.createStructType(fields); // 根据schema转换为dataset Dataset<Row> dfdevice = dflines.map(new MapFunction<String, Row>() { @Override public Row call(String value) throws Exception { String[] lines = value.split(","); return RowFactory.create(lines[0],lines[1],Double.parseDouble(lines[2]), Date.valueOf(lines[3])); } }, RowEncoder.apply(scheme)); // 创建临时表 dfdevice.createOrReplaceTempView("device"); // result table Dataset<Row> dffinal = spark.sql("select * from device"); // output StreamingQuery query = dffinal.writeStream() .outputMode("append") .format("console") .start(); query.awaitTermination(); } }
文章
Java  ·  Scala  ·  流计算
2022-05-15
Spark Streaming之window滑动窗口详解
window滑动窗口Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行 计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作 为window DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次滑动窗口计算,这3 秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口 计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数 值都必须是batch间隔的整数倍。(Spark Streaming对滑动窗口的支持,是比Storm更加完善和强 大的)window滑动窗口操作函数案例:热点搜索词滑动统计,每隔10秒钟,统计最近60秒钟的搜索词的搜索频次,并打印出 排名最靠前的3个搜索词以及出现次数执行reduceByKeyAndWindow,滑动窗口操作第二个参数,是窗口长度,这里是60秒第三个参数,是滑动间隔,这里是10秒也就是说,每隔10秒钟,将最近60秒的数据,作为一个窗口,进行内部的RDD的聚合,然后统一对 一个RDD进行后续计算Java语言实现:package com.kfk.spark.window_hotwords_project; import com.kfk.spark.common.CommStreamingContext; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/18 * @time : 2:03 下午 */ public class WindowHotWordJava { static JavaStreamingContext jssc = null; public static void main(String[] args) throws InterruptedException { jssc = CommStreamingContext.getJssc(); /** * 数据模型:java * hive * spark * java */ JavaReceiverInputDStream<String> inputDstream = jssc.socketTextStream("bigdata-pro-m04",9999); /** * <java,1> * <hive,1> * ... */ JavaPairDStream<String, Integer> pair = inputDstream.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String line) throws Exception { return new Tuple2<>(line,1); } }); /** * <java,5> * <hive,3> * <spark,6> * <flink,10> */ JavaPairDStream<String, Integer> windowWordCount = pair.reduceByKeyAndWindow(new Function2<Integer,Integer,Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }, Durations.seconds(60),Durations.seconds(10)); JavaDStream<Tuple2<String, Integer>> finalStream = windowWordCount.transform( new Function<JavaPairRDD<String, Integer>, JavaRDD<Tuple2<String, Integer>>>() { @Override public JavaRDD<Tuple2<String, Integer>> call(JavaPairRDD<String, Integer> line) throws Exception { JavaPairRDD<Integer,String> beginPair = line.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return new Tuple2<>(stringIntegerTuple2._2,stringIntegerTuple2._1); } }); JavaPairRDD<Integer,String> sortRdd = beginPair.sortByKey(false); JavaPairRDD<String,Integer> sortPair = sortRdd.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { @Override public Tuple2<String,Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception { return new Tuple2<>(integerStringTuple2._2,integerStringTuple2._1); } }); List<Tuple2<String,Integer>> wordList = sortPair.take(3); for (Tuple2<String, Integer> stringIntegerTuple2 : wordList) { System.out.println(stringIntegerTuple2._1 + " : " + stringIntegerTuple2._2); } return jssc.sparkContext().parallelize(wordList); } }); finalStream.print(); jssc.start(); jssc.awaitTermination(); } } Scala语言实现package com.kfk.spark.window_hotwords_project import com.kfk.spark.common.CommStreamingContextScala import org.apache.spark.streaming.Seconds /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/18 * @time : 4:47 下午 */ object WindowHotWordScala { def main(args: Array[String]): Unit = { val jssc = CommStreamingContextScala.getJssc val inputDstream = jssc.socketTextStream("bigdata-pro-m04", 9999) /** * 数据模型:java * hive * spark * java */ val pairDStream = inputDstream.map(x => (x,1)) /** * <java,1> * <hive,1> * ... */ val windowWordCount = pairDStream.reduceByKeyAndWindow((x:Int,y:Int) => x+y,Seconds(60),Seconds(10)) /** * 热点搜索词滑动统计,每隔10秒钟,统计最近60秒钟的搜索词的搜索频次, * 并打印出 排名最靠前的3个搜索词以及出现次数 */ val finalDStream = windowWordCount.transform(x => { val sortRDD = x.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)) val list = sortRDD.take(3) jssc.sparkContext.parallelize(list) }) finalDStream.print() jssc.start() jssc.awaitTermination() } } 测试数据:java java hive hive java hava java hive 运行结果:------------------------------------------- Time: 1608705518000 ms ------------------------------------------- (hive,3) (java ,2) (java,2)
文章
分布式计算  ·  Java  ·  Scala  ·  流计算  ·  Spark
2022-05-15
Structured Streaming详解
一、Structured Streaming概述(1)Structured Streaming背景大多数的流式计算引擎(比如storm、spark streaming等)都仅仅关注流数据的计算方面:比如使 用一个map函数对一个流中每条数据都进行转换,或者是用reduce函数对一批数据进行聚合。但是, 实际上在大部分的流式计算应用中,远远不只是需要一个流式计算引擎那么简单。相反的,流式计 算仅仅在流式应用中占据一个部分而已。因此现在出现了一个新的名词,叫做 持续计算/应用 , continuous application。比如以下一些持续应用的例子:1、更新需要以服务形式实时提供出去的数据:例如,我们可能需要更新一份数据,然后其他用户会 通过web应用来实时查询这些数据。这种情况下,一个技术难题就是实时计算应用如何与实时数据服 务进行交互,比如说,当实时计算应用在更新数据的时候,如果用户通过实时数据服务来进行查询, 此时该如何处理?因此为了处理这种场景下的技术难题,就必须以一个完整的持续计算应用的方式 来构建整个系统,而不是站在实时计算的角度,仅仅考虑实时更新数据。2、实时ETL(Extract、Transform和Load):实时计算领域一个常见的应用就是,将一个存储系统中的数 据转换后迁移至另外一个存储系统。例如说,将JSON格式的日志数据迁移到Hive表中。这种场景下的技术 难题就在于,如何与两边的存储系统进行交互,从而保证数据不会丢失,同时也不会发生重复。这种协调 逻辑是非常复杂的。3、为一个已经存在的批量计算作业开发一个对应的实时计算作业:这个场景的技术难题在于,大多数的 流式计算引擎都无法保证说,它们计算出的结果是与离线计算结果相匹配的。例如说,有些企业会通过实 时计算应用来构建实时更新的dashboard,然后通过批量计算应用来构建每天的数据报表,此时很多用户 就会发现并且抱怨,离线报表与实时dashboard的指标是不一致的。4、在线机器学习:这类持续计算应用,通常都包含了大型的静态数据集以及批处理作业,还有实时数据 流以及实时预测服务等各个组件。以上这些例子就表明了在一个大型的流式计算应用中,流式计算本身其实只是占据了一个部分而已, 其他部分还包括了数据服务、存储以及批处理作业。但是目前的现状是,几乎所有的流式计算引擎都仅仅 是关注自己的那一小部分而已,仅仅是做流式计算处理。这就使得开发人员需要去处理复杂的流式计算应 用与外部存储系统之间的交互,比如说管理事务,同时保证他们的流式计算结果与离线批处理计算结果保 持一致。这就是目前流式计算领域急需要解决的难题与现状。持续计算应用可以定义为,对数据进行实时处理的整套应用系统。spark社区希望能够让开发人员仅仅使 用一套api,就可以完整持续计算应用中各个部分涉及的任务和操作,而这各个部分的任务和操作目前都 是通过分离的单个系统来完成的,比如说实时数据查询服务,以及与批处理作业的交互等。举例来说,未来对于解决这些问题的一些设想如下:1、更新那些需要被实时提供服务的数据:开发人员可以开发一个spark应用,来同时完成更新实时数据, 以及提供实时数据查询服务,可能是通过jdbc相关接口来实现。也可以通过内置的api来实现事务性的、 批量的数据更新,对一些诸如mysql、redis等存储系统。2、实时ETL:开发人员仅仅需要如同批处理作业一样,开发一样的数据转换操作,然后spark就可以自动 完成针对存储系统的操作,并且保证数据的一次且仅一次的强一致性语义。3、为一个批处理作业开发一个实时版本:spark可以保证实时处理作业与批处理作业的结果一定是一致的。4、在线机器学习:机器学习的api将会同时支持实时训练、定期批量训练、以及实时预测服务。(2)Structured Streaming概念Spark 2.0之后,引入的structured streaming,就是为了实现上述所说的continuous application,也就是持续计算的。首先,structured streaming是一种比spark更高级的api,主要是基于spark的批处理中的 高阶api,比如dataset/dataframe。此外,structured streaming也提供很多其他流式计算应用所无法提 供的功能:1、保证与批处理作业的强一致性:开发人员可以通过dataset/dataframe api以开发批处理作业的方式来 开发流式处理作业,进而structured streaming可以以增量的方式来运行这些计算操作。在任何时刻,流 式处理作业的计算结果,都与处理同一份batch数据的批处理作业的计算结果,是完全一致的。而大多数的 流式计算引擎,比如storm、kafka stream、flink等,是无法提供这种保证的。2、与存储系统进行事务性的整合:structured streaming在设计时就考虑到了,要能够基于存储系统保证数 据被处理一次且仅一次,同时能够以事务的方式来操作存储系统,这样的话,对外提供服务的实时数据才能在 任何时刻都保持一致性。目前spark 2.4版本的structured streaming,已经支持核心的外部存储服务。事务 性的更新是流式计算开发人员的一大痛点,其他的流式计算引擎都需要我们手动来实现,而structured streaming希望在内核中自动来实现。3、与spark的其他部分进行无缝整合:structured steaming已经提供了核心外部系统的整合集成的API。二、Structured Streaming编程模型(1)编程模型Structured Streaming的核心理念,就是将数据流抽象成一张表,而源源不断过来的数据是持续地 添加到这个表中的。 这就产生了一种全新的流式计算模型,与离线计算模型是很类似的。你可以使 用与在一个静态表中执行离线查询相同的方式来编写流式查询。spark会采用一种增量执行的方式 来对表中源源不断的数据进行查询。我们可以将输入数据流想象成是一张input table。数据流中 每条新到达的数据,都可以想象成是一条添加到表中的新数据。我们可以定义每次结果表中的数据更新时,以何种方式,将哪些数据写入外部存储。我们有多种模式的 output:complete mode: 被更新后的整个结果表中的数据,都会被写入外部存储。具体如何写入,是根据不 同的外部存储自身来决定的。append mode: 只有最近一次trigger之后,新增加到result table中的数据,会被写入外部存储。只 有当我们确定,result table中已有的数据是肯定不会被改变时,才应该使用append mode。update mode: 只有最近一次trigger之后,result table中被更新的数据,包括增加的和修改的,会 被写入外部存储中。(2)EventTimeevent-time指的是嵌入在数据自身内部的一个时间。在很多流式计算应用中,我们可能都需要根据 event-time来进行处理。例如,可能我们需要获取某个设备每分钟产生的事件的数量,那么我们就需要使 用事件产生时的时间,而不是spark接受到这条数据的时间。设备产生的每个事件都是input table中的一 行数据,而event-time就是这行数据的一个字段。这就可以支持我们进行基于时间窗口的聚合操作(例如 每分钟的事件数量),只要针对input table中的event-time字段进行分组和聚合即可。每个时间窗口就 是一个分组,而每一行都可以落入不同行的分组内。因此,类似这样的基于时间窗口的分组聚合操作,既 可以被定义在一份静态数据上,也可以被定义在一个实时数据流上。此外,这种模型也天然支持延迟到达的数据,late-data。spark会负责更新result table,因此它有绝对 的控制权来针对延迟到达的数据进行聚合结果的重新计算。自Spark2.1之后Structured Streaming 开始支持 watermark(水印),允许用户指定延时数据的阈值,并允许引擎相应地清除阈值范围之外的旧状态。(3)容错语义Structured Streaming的核心设计理念和目标之一,就是支持一次且仅一次的语义。为了实现这个目标, structured streaming设计将source、sink和execution engine来追踪计算处理的进度,这样就可以在任 何一个步骤出现失败时自动重试。每个streaming source都被设计成支持offset,进而可以让spark来追 踪读取的位置。spark基于checkpoint和wal来持久化保存每个trigger interval内处理的offset的范围。 sink被设计成可以支持在多次计算处理时保持幂等性,就是说,用同样的一批数据,无论多少次去更新 sink,都会保持一致和相同的状态。这样的话,综合利用基于offset的source,基于checkpoint和wal的 execution engine,以及基于幂等性的sink,可以支持完整的一次且仅一次的语义。三、基于WordCount程序讲解Structured Streaming编程模型package com.kfk.spark.structuredstreaming; import com.kfk.spark.common.CommSparkSession; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQueryException; import java.util.Arrays; import java.util.Iterator; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/21 * @time : 9:04 下午 */ public class WordCountJava { public static void main(String[] args) throws StreamingQueryException { SparkSession spark = CommSparkSession.getSparkSession(); // Create DataFrame representing the stream of input lines from connection to localhost:9999 Dataset<Row> lines = spark.readStream() .format("socket") .option("host", "bigdata-pro-m04") .option("port", 9999) .load(); // Split the lines into words Dataset<String> words = lines.as(Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } },Encoders.STRING()); // Generate running word count Dataset<Row> wordCounts = words.groupBy("value").count(); // Start running the query that prints the running counts to the console StreamingQuery query = wordCounts.writeStream() .outputMode("complete") .format("console") .start(); query.awaitTermination(); } } 我们可以以wordcount例子作为背景来理解,lines dataframe是一个input table,而 wordcounts dataframe就是一个result table。当应用启动后,spark会周期性地check socket输入源中 是否有新数据到达。如果有新数据到达,那么spark会将之前的计算结果与新到达的数据整合起来,以增 量的方式来运行我们定义的计算操作,进而计算出最新的单词计数结果。
文章
存储  ·  机器学习/深度学习  ·  SQL  ·  分布式计算  ·  Java  ·  数据库连接  ·  API  ·  HIVE  ·  流计算  ·  Spark
2022-05-15
基于Spark Streaming对新闻网站项目案例分析
一、需求分析新闻网站需求:pvuv注册用户数热门板块数据处理流程:数据源 -> kafka -> spark streaming二、数据准备(1)数据格式网站日志格式 :date,timestamp,userid,pageid,section,action日志字段说明:date: 日期,yyyy-MM-dd格式 timestamp: 时间戳 userid: 用户id pageid: 页面id section: 版块 action: 用户行为,两类,点击页面和注册 数据展示:2020-12-20 1608451521565 364 422 fashion view 2020-12-20 1608451521565 38682 708 aviation view 2020-12-20 1608451521565 65444 270 internet view 2020-12-20 1608451521565 4805 250 tv-show view 2020-12-20 1608451521565 1130 743 movie view 2020-12-20 1608451521565 85320 605 carton view 2020-12-20 1608451521565 null 581 movie view 2020-12-20 1608451521565 null null null register kafka消费者启动:bin/kafka-console-consumer.sh --bootstrap-server bigdata-pro-m04:9092 --topic spark (2)基于Java开发实时数据生成器这里生成的实时数据流,生成的传递给kafka,每隔1秒随机生成1000条数据。package com.kfk.spark.news_analysis_project; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Properties; import java.util.Random; /** * 访问日志Kafka Producer * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/12 * @time : 7:51 下午 */ public class AccessProducer extends Thread{ private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd"); private static String date; // 版块内容 private static String[] sections = new String[] {"country", "international", "sport", "entertainment", "movie", "carton", "tv-show", "technology", "internet", "car", "military", "funny", "fashion", "aviation", "government"}; private static Random random = new Random(); private static int[] newOldUserArr = new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; private Producer<String,String> producer; private String topic; /** * 构造函数 * @param topic */ public AccessProducer(String topic){ this.topic = topic; producer = new KafkaProducer<String, String>(createProducerConfig()); date = simpleDateFormat.format(new Date()); } /** * createProducerConfig * @return */ public Properties createProducerConfig(){ Properties properties = new Properties(); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("bootstrap.servers", "bigdata-pro-m04:9092"); return properties; } @Override public void run(){ int counter = 0; while (true){ // 生成1000条访问数据 for (int i = 0;i < 1000;i++){ String log = null; // 生成条访问数据 if (newOldUserArr[random.nextInt(10)] == 1){ log = getAccessLog(); } else { log = getRegisterLog(); } // 将数据发送给kafka producer.send(new ProducerRecord<String, String>(topic,log)); counter++; if (counter == 100){ counter = 0; try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } } /** * 生成注册数据 * @return */ private static String getRegisterLog(){ StringBuffer stringBuffer = new StringBuffer(""); // 生成时间戳 long timestamp = System.currentTimeMillis(); // 随机生成userid(默认1000注册用户,每天1/10的访客是未注册用户) Long userid = 0L; int newOldUser = newOldUserArr[random.nextInt(10)]; if (newOldUser == 1){ userid = null; } else { userid = (long)random.nextInt(100000); } // 随机生成pageid,共1000个页面 long pageid = random.nextInt(1000); // 随机生成板块 String section = sections[random.nextInt(sections.length)]; // 生成固定的行为,view String action = "view"; return stringBuffer.append(date).append(" ") .append(timestamp).append(" ") .append(userid).append(" ") .append(pageid).append(" ") .append(section).append(" ") .append(action).toString(); } /** * 生成访问数据 * @return */ private static String getAccessLog(){ StringBuffer stringBuffer = new StringBuffer(""); // 生成时间戳 long timestamp = System.currentTimeMillis(); // 新用户都是userid为null Long userid = null; // 生成随机pageid,都是null Long pageid = null; // 生成随机版块,都是null String section = null; // 生成固定的行为,view String action = "register"; return stringBuffer.append(date).append(" ") .append(timestamp).append(" ") .append(userid).append(" ") .append(pageid).append(" ") .append(section).append(" ") .append(action).toString(); } public static void main(String[] args) { AccessProducer accessProducer = new AccessProducer("spark"); accessProducer.start(); } } 三、实施过程将每一个需求写入到一个方法中,运行流程分析及结果以注释的形式显示。关于数据模型,可以参考基于Spark SQL对新闻网站项目案例分析这篇文章https://blog.csdn.net/weixin_45366499/article/details/111119234package com.kfk.spark.common; import org.apache.spark.SparkConf; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaStreamingContext; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/14 * @time : 8:23 下午 */ public class CommStreamingContext { public static JavaStreamingContext getJssc(){ SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("CommStreamingContext"); return new JavaStreamingContext(conf, Durations.seconds(5)); } } package com.kfk.spark.news_analysis_project; import com.kfk.spark.common.CommStreamingContext; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import scala.Tuple2; import java.util.*; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/20 * @time : 4:11 下午 */ public class NewsRealTime { /** * input data: * 2020-12-20 1608451521565 364 422 fashion view * 2020-12-20 1608451521565 38682 708 aviation view * ... * @param args */ public static void main(String[] args) throws InterruptedException { JavaStreamingContext jssc = CommStreamingContext.getJssc(); // sparkstreaming与kafka连接 Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers", "bigdata-pro-m04:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "streaming_kafka_1"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); // 设置topic Collection<String> topics = Collections.singletonList("spark"); // kafka数据源 JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream( jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) ); /** * stream -> map -> JavaDStream */ JavaDStream<String> accessDstream = stream.map(new Function<ConsumerRecord<String, String>, String>() { @Override public String call(ConsumerRecord<String, String> v1) throws Exception { return v1.value(); } }); /** * accessDStream -> filter -> action(view) */ JavaDStream<String> filterDstream = accessDstream.filter(new Function<String, Boolean>() { @Override public Boolean call(String v1) throws Exception { String[] lines = v1.split(" "); String action = lines[5]; String actionValue = "view"; if (actionValue.equals(action)){ return true; } else { return false; } } }); // 求网页的pv calculatePagePV(filterDstream); // 求网页的uv calculatePageUV(filterDstream); // 求注册用户数 calculateRegistercount(accessDstream); // 求热门板块 calculateUserSectionPV(accessDstream); jssc.start(); jssc.awaitTermination(); } /** * 求网页的pv * input data: * 2020-12-20 1608451521565 364 422 fashion view * * 数据演化过程: * filterDstream -> mapToPair -> <2020-12-20_422,1> -> reduceByKey -> <2020-12-20_422,5> * * @param filterDstream */ public static void calculatePagePV(JavaDStream<String> filterDstream){ /** * filterDstream -> mapToPair -> <2020-12-20_422,1> */ JavaPairDStream<String,Integer> pairDstream = filterDstream.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String,Integer> call(String lines) throws Exception { String[] line = lines.split(" "); return new Tuple2<String,Integer>(line[0] + "_" + line[3], 1); } }); /** * pairDstream -> reduceByKey -> <2020-12-20_422,5> */ JavaPairDStream<String,Integer> pvStream = pairDstream.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); pvStream.print(); /** * (2020-12-21_16,1) * (2020-12-21_548,1) * (2020-12-21_881,1) * (2020-12-21_27,1) * (2020-12-21_771,1) * (2020-12-21_344,2) * (2020-12-21_313,1) * (2020-12-21_89,1) * (2020-12-21_14,1) * (2020-12-21_366,1) * ... */ } /** * 求网页的uv * input data: * 2020-12-20 1608451521565 364 422 fashion view * 2020-12-20 1608451521565 364 422 fashion view * 2020-12-20 1608451521565 365 422 fashion view * 2020-12-20 1608451521565 366 422 fashion view * 2020-12-20 1608451521565 367 422 fashion view * 2020-12-20 1608451521565 367 453 fashion view * * 数据演化过程: * 第一步:map * (2020-12-20,364,422) * (2020-12-20,364,422) * (2020-12-20,365,422) * (2020-12-20,366,422) * (2020-12-20,367,422) * (2020-12-20,367,453) * * 第二步:rdd -> distinct * (2020-12-20,364,422) * (2020-12-20,365,422) * (2020-12-20,366,422) * (2020-12-20,367,422) * (2020-12-20,367,453) * * 第三步:mapToPair * <2020-12-20_422,1> * <2020-12-20_422,1> * <2020-12-20_422,1> * <2020-12-20_422,1> * <2020-12-20_453,1> * * 第四步:reduceByKey * <2020-12-20_422,4> * <2020-12-20_453,1> * * @param filterDstream */ public static void calculatePageUV(JavaDStream<String> filterDstream){ /** * filterDstream -> map -> (2020-12-20,364,422) */ JavaDStream<String> mapDstream = filterDstream.map(new Function<String, String>() { @Override public String call(String lines) throws Exception { String[] line = lines.split(" "); return line[0] + "," + line[2] + "," + line[3]; } }); /** * mapDstream -> distinct */ JavaDStream<String> distinctDstream = mapDstream.transform(new Function<JavaRDD<String>, JavaRDD<String>>() { @Override public JavaRDD<String> call(JavaRDD<String> lines) throws Exception { return lines.distinct(); } }); /** * distinctDstream -> mapToPair -> <2020-12-20_422,1> */ JavaPairDStream<String,Integer> pairDstream = distinctDstream.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String lines) throws Exception { String[] line = lines.split(","); return new Tuple2<>(line[0] + "_" + line[2], 1); } }); /** * pairDstream -> reduceByKey -> <2020-12-20_422,4> */ JavaPairDStream<String,Integer> uvStream = pairDstream.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); uvStream.print(); /** * (2020-12-21_492,1) * (2020-12-21_85,2) * (2020-12-21_18,1) * (2020-12-21_27,2) * (2020-12-21_825,1) * (2020-12-21_366,1) * (2020-12-21_89,1) * (2020-12-21_14,2) * (2020-12-21_69,1) * (2020-12-21_188,1) * ... */ } /** * 求注册用户数:过滤出action=register的数据就可以 * input data: * 2020-12-20 1608451521565 364 422 fashion view * * 数据演化过程: * accessDStream -> filter -> action(register) -> mapToPair -> reduceByKey * * @param accessDstream */ public static void calculateRegistercount(JavaDStream<String> accessDstream){ /** * accessDStream -> filter -> action(register) */ JavaDStream<String> filterDstream = accessDstream.filter(new Function<String, Boolean>() { @Override public Boolean call(String v1) throws Exception { String[] lines = v1.split(" "); String action = lines[5]; String actionValue = "register"; if (actionValue.equals(action)){ return true; } else { return false; } } }); /** * filterDstream -> mapToPair -> <2020-12-20_register,1> */ JavaPairDStream<String,Integer> pairDstream = filterDstream.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String,Integer> call(String lines) throws Exception { String[] line = lines.split(" "); return new Tuple2<String,Integer>(line[0] + "_" + line[5], 1); } }); /** * pairDstream -> reduceByKey -> <2020-12-20_register,5> */ JavaPairDStream<String,Integer> registerCountStream = pairDstream.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); registerCountStream.print(); /** * (2020-12-21_register,11) */ } /** * 求出热门板块 * input data: * 2020-12-20 1608451521565 364 422 fashion view * * 数据演化过程: * filterDstream -> mapToPair -> <2020-12-20_fashion,1> -> reduceByKey -> <2020-12-20_fashion,5> * * @param filterDstream */ public static void calculateUserSectionPV(JavaDStream<String> filterDstream){ /** * filterDstream -> mapToPair -> <2020-12-20_fashion,1> */ JavaPairDStream<String,Integer> pairDstream = filterDstream.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String,Integer> call(String lines) throws Exception { String[] line = lines.split(" "); return new Tuple2<String,Integer>(line[0] + "_" + line[4], 1); } }); /** * pairDstream -> reduceByKey -> <2020-12-20_fashion,5> */ JavaPairDStream<String,Integer> pvStream = pairDstream.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); pvStream.print(); /** * (2020-12-21_internet,16) * (2020-12-21_military,24) * (2020-12-21_aviation,21) * (2020-12-21_carton,19) * (2020-12-21_government,25) * (2020-12-21_tv-show,19) * (2020-12-21_country,14) * (2020-12-21_movie,13) * (2020-12-21_international,16) * ... */ } } 这里没有做过多的业务分析,可以根据前面所学的知识进行扩展,将算子灵活运用组合起来!
文章
消息中间件  ·  SQL  ·  分布式计算  ·  Java  ·  Kafka  ·  数据处理  ·  Spark  ·  流计算  ·  数据格式
2022-05-15
1 2 3 4 5 6 7 8 9
...
20
跳转至:
大数据
183865 人关注 | 22884 讨论 | 50211 内容
+ 订阅
  • 被人家忽略的强大的批处理框架-Spring Batch(上)
  • 我在想这到底是款什么神仙工具,这么牛逼(下)
  • 设计模式与范式 --- 结构型模式(适配器模式)
查看更多 >
开发与运维
5177 人关注 | 125216 讨论 | 178290 内容
+ 订阅
  • 设计模式与范式 --- 适配器模式、装饰器模式与代理模式的区别与联系
  • 被人家忽略的强大的批处理框架-Spring Batch(上)
  • Java对象与Java类(下)
查看更多 >
数据库
248671 人关注 | 44341 讨论 | 53765 内容
+ 订阅
  • 被人家忽略的强大的批处理框架-Spring Batch(上)
  • Java对象与Java类(下)
  • Oracle学习(十五):分布式数据库
查看更多 >
人工智能
2594 人关注 | 9266 讨论 | 60379 内容
+ 订阅
  • JavaScript的使用
  • 数据结构-红黑树分析+代码
  • 位运算系列
查看更多 >
微服务
22755 人关注 | 9844 讨论 | 18714 内容
+ 订阅
  • 被人家忽略的强大的批处理框架-Spring Batch(上)
  • Java权限认证有它就够了
  • 我在想这到底是款什么神仙工具,这么牛逼(下)
查看更多 >