《Storm技术内幕与大数据实践》一9.1 实时DAU计算

简介:

本节书摘来异步社区《Storm技术内幕与大数据实践》一书中的第9章,第9.1节,作者: 陈敏敏 , 黄奉线 , 王新春
责编: 杨海玲,更多章节内容可以访问云栖社区“异步社区”公众号查看。

9.1 实时DAU计算

DAU是每天访问的UV数,00:00~24:00内相同的客户端只被计算一次。UV是非常核心的一个指标,通过对每个时间点的DAU数据的分析,可以查看运营活动的效率以及当前网站运行的整体情况等,可以对系统优化和运营效率提升等起到很好的促进作用。目前大众点评各个平台的累积DAU达到千万级,PV到亿级。本节以DAU为例,简要介绍一下大众点评是怎么使用实时平台的。实时DAU计算包括了大众点评的所有不同平台移动客户端(大众点评APP、大众点评团APP和周边快查APP)、PC端和M站。

移动端实时DAU的Topology计算逻辑的DAG参见图9-1。


9_1


在Storm UI上Topology的运行情况如图9-2所示。

.
9_2

各个组件的功能描述如下。

(1)BlackholeBlockingQueueSpout:作为Blackhole的Consumer获取Mobile的日志数据。源码可以参考https://github.com/xinchun-wang/storm-util

(2)MobileLogParserBolt:解析Mobile的日志,输出后续计算所需要的数据,具体包含trainId(不同平台的trainId不一样。例如,Android为7,iOS为10)、deviceId是设备ID(deviceId可能是IMEI、UUID、MAC、UDID、IDFA、OPENUDID中的一个或多个,根据不同的操作系统和才做系统版本来确定)、addtime表示日志达到时间、source表示APP是从哪个AppStore安装的、userId为登录后的大众点评的用户ID。Spout的数据Shuffle到MobileLogParserBolt上,保证每个日志的Parser节点分到的数据基本上相同。

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
 declarer.declare(new Fields("trainId","deviceId", "addtime", "source", "userId"));
}

(3)DPIDBolt:根据deviceId和trainId从HBase获取对应的DPID。DPID是大众点评对每个安装了点评APP的设备所标识的唯一ID,只要设备不变,DPID就是同一个。deviceId和DPID存在映射关系,目的是当deviceId切换(随着OS对安全性的策略改变,设备可以获取到的ID会发生变化,如从IMEI变成UUID)的时候,我们还可以正确标识这个设备。在HBase中,维护了DPID到deviceId的多对多的映射关系。MobileLogParserBolt输出的数据根据trainId和deviceIdfieldsGrouping输出到DPIDBolt中,相同的trainId和deviceId可以到同一个Bolt。这样DPIDBolt可以在内部缓存,减少HBase访问的次数。DPIDBolt的输出为:

@Override
public void declareOutputFields(OutputFieldsDeclarer outputfieldsdeclarer) {
  outputfieldsdeclarer.declare(new Fields("trainId", "dpid", "isNew", " addtime", "source"));
}

在DPIDBolt中,同时会实时更新DPID和userId的映射关系。如果这个DPID是今天新产生的,那么isNew为true,表示是个新用户。

(4)PartialUVBolt:在PartialUVBolt中,主要是统计不同设备的DPID出现次数(UV),如果今天这个DPID之前没有访问过,次数加1,否则不计算。为了保证数据不出错,数据会存储到Redis中。当某个Bolt出现错误的时候,数据不会丢失。每隔10秒会将计数值输出。

@Override
public void declareOutputFields(OutputFieldsDeclareroutputFieldsDeclarer) {
  outputFieldsDeclarer.declare(new Fields("type", "date", "dau", "newUV", "source"));
}

type等同于前面的trainId,date表示当前是那个计算周期,dau就是当前计算周期的UV值,newUV是当前计算周期的新UV值,source是当前计算周期来源的渠道。DPIDBolt和PartialUVBolt之间是fieldsGrouping,也就是相同的trainId和DPID发送到同一个PartialUVBolt中。PartialUVBolt的数据是每隔一定周期发射出去,具体的周期是依靠Tick Tuple消息来完成。重载在Bolt的getComponentConfiguration()方法:

@Override
public Map getComponentConfiguration(){
  Map<String, Object>conf = new HashMap<String, Object>();
  conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Constants.EMIT_FREQUENCY_IN_SECONDS);
  returnconf;
}

这样PartialUVBolt就可以以Constants.EMIT_FREQUENCY_IN_SECONDS的频率收到Tick Tuple消息,然后在Bolt的execute方法中,判断是Tick Tuple,就发射数据出去。判断的方法为:

public static booleanisTickTuple(Tuple tuple) {
  returntuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
}

(5)AggregatorUVBolt:AggregatorUVBolt完成的是将PartialUVBolt的数据聚合起来,根据不同的type计算当前周期的数据汇总,汇总完毕的数据发射到PersistenceUVBolt中。

(6)PersistenceUVBolt:PersistenceUVBolt就是将数据写入MySQL中,然后由RPC服务提供给不同的使用者,包括Dashboard、微信公众号和大众点评的内部APP等,用来展示或者报警等。

整个Topology的构建参考下面的代码逻辑:

public class MobileUVTopology {
private static final int TOPOLOGY_NAME_INDEX = 0;
  private static final String BLOCKHOLE_TOPIC = "dpods_log_mobile-log-web_MAIN";
private static final String MOBILE_WEB_MAIN_SPOUT_ID = "MobileWebMainSpout";
private static final String LOG_PARSER_ID = "LogParser";
private static final String DPID_ID = "DPID";
public static void main(String[] args) throws Exception {
  TopologyBuilder builder = new TopologyBuilder();
  String TopologyName = getTopologyName(args);

  builder.setSpout(MOBILE_WEB_MAIN_SPOUT_ID , 
    newBlackholeBlockingQueueSpout(BLOCKHOLE_TOPIC, getTopologyName(args)),
      CommonUtil.getParallelism(args, 1));
  builder.setBolt(LOG_PARSER_ID, new MobileLogParserBolt(), 
    20).shuffleGrouping(MOBILE_WEB_MAIN_SPOUT_ID, BLOCKHOLE_TOPIC);
  builder.setBolt(DPID_ID, new DPIDBolt(), 
    24).fieldsGrouping(LOG_PARSER_ID, new Fields("trainId","deviceId"));
  builder.setBolt(Constants.PARTIAL_UV_ID, new PartialUVBolt("APP"), 
    16).fieldsGrouping(DPID_ID, new Fields("trainId","dpid"));
  builder.setBolt(Constants.AGGREGATOR_UV_ID, new AggregatorUVBolt(), 
    1).noneGrouping(Constants.PARTIAL_UV_ID);
  builder.setBolt(Constants.PERSISTENCE_UV_ID, new PersistenceUVBolt("APP",
    TopologyName), 2).shuffleGrouping(Constants.AGGREGATOR_UV_ID);
  Configconf = new Config();
  if (args != null &&args.length> 0) {
    conf.setNumWorkers(8);
    conf.registerMetricsConsumer(
      backtype.storm.metric.LoggingMetricsConsumer.class, 1);
    conf.registerMetricsConsumer(
      com.dianping.cosmos.metric.CatMetricsConsumer.class, 1);
    StormSubmitter.submitTopology(args[0], conf,builder.createTopology());
    } else {
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("MobileUV", conf, builder.createTopology());
    }
  }
  private static String getTopologyName(String[] args) {
    try {
     returnargs[TOPOLOGY_NAME_INDEX];
    } catch (Exception e) {
     return "MovileUV";
    }
  }
}

网页实时DAU结果的部分Dashboard如图9-3所示。

从图上可以看出,10点半左右有个高峰,通常是某个运营活动(如抽奖、抢红包等)产生的,从该图可以直接看出运营的效果。

从日环比和周同比(如图9-4所示)可以看出今天的用户访问情况是增加还是减少了,如果发生明显增加或者减少就可以及时分析问题,采取应对的策略。


9_3_4

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
11天前
|
JavaScript 前端开发 大数据
数字太大了,计算加法、减法会报错,结果不正确?怎么办?用JavaScript实现大数据(超过20位的数字)相加减运算。
数字太大了,计算加法、减法会报错,结果不正确?怎么办?用JavaScript实现大数据(超过20位的数字)相加减运算。
|
6月前
|
SQL 分布式计算 大数据
黑马程序员-大数据入门到实战-分布式SQL计算 Hive 入门
黑马程序员-大数据入门到实战-分布式SQL计算 Hive 入门
67 0
|
6月前
|
SQL 存储 大数据
黑马程序员-大数据入门到实战-分布式SQL计算 Hive 语法与概念
黑马程序员-大数据入门到实战-分布式SQL计算 Hive 语法与概念
74 0
|
9月前
|
Cloud Native 大数据
阿里云最新产品手册——阿里云核心产品——云原生大数据计算服务——云原生、大数据、计算的结合
阿里云最新产品手册——阿里云核心产品——云原生大数据计算服务——云原生、大数据、计算的结合自制脑图
174 1
|
5月前
|
消息中间件 分布式计算 Kafka
将Apache Flink任务实时消费Kafka窗口的计算改为MaxCompute
将Apache Flink任务实时消费Kafka窗口的计算改为MaxCompute
77 6
|
6月前
|
分布式计算 Unix MaxCompute
在MaxCompute中,时间的计算是基于秒的
在MaxCompute中,时间的计算是基于秒的
38 1
|
7月前
|
存储 JSON 自然语言处理
【ODPS新品发布第2期】实时数仓Hologres:推出计算组实例/支持JSON数据/向量计算+大模型等新能力
本期将重点介绍Hologres推出计算组实例,Hologres支持JSON数据 ,Hologres向量计算+大模型能力,Hologres数据同步新能力,Hologres数据分层存储
|
8月前
|
存储 SQL 分布式计算
MaxCompute发布按量付费闲时版,计算成本最高节省66.66%!
在大数据不断在追求计算效率和成本优化的背景下,阿里云云原生大数据计算服务 MaxCompute宣布推出按量付费闲时版,用户可选择用此版本完成时间不敏感的作业,从而降低计算成本,同等作业类型的计算费用与按量付费标准版相比,最高可实现66.66%的计算成本优化。
278 1
|
9月前
|
SQL JSON 关系型数据库
php执行语句在MySQL批量插入大数据量的解决方案及计算程序执行时间(大数据量、MySQL语句优化)
php执行语句在MySQL批量插入大数据量的解决方案及计算程序执行时间(大数据量、MySQL语句优化)
195 1
|
9月前
|
分布式计算 大数据 数据挖掘
云计算与大数据期末项目 电商大数据离线计算
云计算与大数据期末项目 电商大数据离线计算
87 0