本节书摘来异步社区《Storm技术内幕与大数据实践》一书中的第9章,第9.1节,作者: 陈敏敏 , 黄奉线 , 王新春
责编: 杨海玲,更多章节内容可以访问云栖社区“异步社区”公众号查看。
9.1 实时DAU计算
DAU是每天访问的UV数,00:00~24:00内相同的客户端只被计算一次。UV是非常核心的一个指标,通过对每个时间点的DAU数据的分析,可以查看运营活动的效率以及当前网站运行的整体情况等,可以对系统优化和运营效率提升等起到很好的促进作用。目前大众点评各个平台的累积DAU达到千万级,PV到亿级。本节以DAU为例,简要介绍一下大众点评是怎么使用实时平台的。实时DAU计算包括了大众点评的所有不同平台移动客户端(大众点评APP、大众点评团APP和周边快查APP)、PC端和M站。
移动端实时DAU的Topology计算逻辑的DAG参见图9-1。
在Storm UI上Topology的运行情况如图9-2所示。
.
各个组件的功能描述如下。
(1)BlackholeBlockingQueueSpout:作为Blackhole的Consumer获取Mobile的日志数据。源码可以参考https://github.com/xinchun-wang/storm-util。
(2)MobileLogParserBolt:解析Mobile的日志,输出后续计算所需要的数据,具体包含trainId(不同平台的trainId不一样。例如,Android为7,iOS为10)、deviceId是设备ID(deviceId可能是IMEI、UUID、MAC、UDID、IDFA、OPENUDID中的一个或多个,根据不同的操作系统和才做系统版本来确定)、addtime表示日志达到时间、source表示APP是从哪个AppStore安装的、userId为登录后的大众点评的用户ID。Spout的数据Shuffle到MobileLogParserBolt上,保证每个日志的Parser节点分到的数据基本上相同。
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("trainId","deviceId", "addtime", "source", "userId"));
}
(3)DPIDBolt:根据deviceId和trainId从HBase获取对应的DPID。DPID是大众点评对每个安装了点评APP的设备所标识的唯一ID,只要设备不变,DPID就是同一个。deviceId和DPID存在映射关系,目的是当deviceId切换(随着OS对安全性的策略改变,设备可以获取到的ID会发生变化,如从IMEI变成UUID)的时候,我们还可以正确标识这个设备。在HBase中,维护了DPID到deviceId的多对多的映射关系。MobileLogParserBolt输出的数据根据trainId和deviceIdfieldsGrouping输出到DPIDBolt中,相同的trainId和deviceId可以到同一个Bolt。这样DPIDBolt可以在内部缓存,减少HBase访问的次数。DPIDBolt的输出为:
@Override
public void declareOutputFields(OutputFieldsDeclarer outputfieldsdeclarer) {
outputfieldsdeclarer.declare(new Fields("trainId", "dpid", "isNew", " addtime", "source"));
}
在DPIDBolt中,同时会实时更新DPID和userId的映射关系。如果这个DPID是今天新产生的,那么isNew为true,表示是个新用户。
(4)PartialUVBolt:在PartialUVBolt中,主要是统计不同设备的DPID出现次数(UV),如果今天这个DPID之前没有访问过,次数加1,否则不计算。为了保证数据不出错,数据会存储到Redis中。当某个Bolt出现错误的时候,数据不会丢失。每隔10秒会将计数值输出。
@Override
public void declareOutputFields(OutputFieldsDeclareroutputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("type", "date", "dau", "newUV", "source"));
}
type等同于前面的trainId,date表示当前是那个计算周期,dau就是当前计算周期的UV值,newUV是当前计算周期的新UV值,source是当前计算周期来源的渠道。DPIDBolt和PartialUVBolt之间是fieldsGrouping,也就是相同的trainId和DPID发送到同一个PartialUVBolt中。PartialUVBolt的数据是每隔一定周期发射出去,具体的周期是依靠Tick Tuple消息来完成。重载在Bolt的getComponentConfiguration()方法:
@Override
public Map getComponentConfiguration(){
Map<String, Object>conf = new HashMap<String, Object>();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Constants.EMIT_FREQUENCY_IN_SECONDS);
returnconf;
}
这样PartialUVBolt就可以以Constants.EMIT_FREQUENCY_IN_SECONDS的频率收到Tick Tuple消息,然后在Bolt的execute方法中,判断是Tick Tuple,就发射数据出去。判断的方法为:
public static booleanisTickTuple(Tuple tuple) {
returntuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
}
(5)AggregatorUVBolt:AggregatorUVBolt完成的是将PartialUVBolt的数据聚合起来,根据不同的type计算当前周期的数据汇总,汇总完毕的数据发射到PersistenceUVBolt中。
(6)PersistenceUVBolt:PersistenceUVBolt就是将数据写入MySQL中,然后由RPC服务提供给不同的使用者,包括Dashboard、微信公众号和大众点评的内部APP等,用来展示或者报警等。
整个Topology的构建参考下面的代码逻辑:
public class MobileUVTopology {
private static final int TOPOLOGY_NAME_INDEX = 0;
private static final String BLOCKHOLE_TOPIC = "dpods_log_mobile-log-web_MAIN";
private static final String MOBILE_WEB_MAIN_SPOUT_ID = "MobileWebMainSpout";
private static final String LOG_PARSER_ID = "LogParser";
private static final String DPID_ID = "DPID";
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
String TopologyName = getTopologyName(args);
builder.setSpout(MOBILE_WEB_MAIN_SPOUT_ID ,
newBlackholeBlockingQueueSpout(BLOCKHOLE_TOPIC, getTopologyName(args)),
CommonUtil.getParallelism(args, 1));
builder.setBolt(LOG_PARSER_ID, new MobileLogParserBolt(),
20).shuffleGrouping(MOBILE_WEB_MAIN_SPOUT_ID, BLOCKHOLE_TOPIC);
builder.setBolt(DPID_ID, new DPIDBolt(),
24).fieldsGrouping(LOG_PARSER_ID, new Fields("trainId","deviceId"));
builder.setBolt(Constants.PARTIAL_UV_ID, new PartialUVBolt("APP"),
16).fieldsGrouping(DPID_ID, new Fields("trainId","dpid"));
builder.setBolt(Constants.AGGREGATOR_UV_ID, new AggregatorUVBolt(),
1).noneGrouping(Constants.PARTIAL_UV_ID);
builder.setBolt(Constants.PERSISTENCE_UV_ID, new PersistenceUVBolt("APP",
TopologyName), 2).shuffleGrouping(Constants.AGGREGATOR_UV_ID);
Configconf = new Config();
if (args != null &&args.length> 0) {
conf.setNumWorkers(8);
conf.registerMetricsConsumer(
backtype.storm.metric.LoggingMetricsConsumer.class, 1);
conf.registerMetricsConsumer(
com.dianping.cosmos.metric.CatMetricsConsumer.class, 1);
StormSubmitter.submitTopology(args[0], conf,builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("MobileUV", conf, builder.createTopology());
}
}
private static String getTopologyName(String[] args) {
try {
returnargs[TOPOLOGY_NAME_INDEX];
} catch (Exception e) {
return "MovileUV";
}
}
}
网页实时DAU结果的部分Dashboard如图9-3所示。
从图上可以看出,10点半左右有个高峰,通常是某个运营活动(如抽奖、抢红包等)产生的,从该图可以直接看出运营的效果。
从日环比和周同比(如图9-4所示)可以看出今天的用户访问情况是增加还是减少了,如果发生明显增加或者减少就可以及时分析问题,采取应对的策略。