使用MaxCompute进行纽约的士拼车分析

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 我们通过将纽约的士的时空数据转换成为图的方法,使用odps自带的graph分析工具来进行拼车分析。

前言

最近几年以来出现的共享的士(Uber,Lyft,滴滴)给人们的出行带来了极大的便利。随着烧钱大战的结束,中美市场大局已定,为了维持高估值(Uber 80 Billion $, 滴滴30 Billion $),缩减亏损,增长净利润,继而进入上市流程,几大公司都开始发掘盈利的规则。带来的影响是,共享出行的用户们发现:1)车越来越难打,价钱越来越高;2)使用拼车会大幅度增长时间损耗,而带来的金钱节约却并不明显;3)司机发现盈利有限,真正愿意开车的司机越来越少,某拼车公司正在慢慢的转变成为一个的士信息服务平台。这与其做成让用户通过手机便捷,实惠出行的愿景大相径庭。

我想说一个故事,作为这篇文章所要解决的的一个问题的引子。那天天气炎热,我正在公交等去高铁站的公共汽车,由于太热,我决定打一辆车,当我拦下来一辆车后跟司机说15块钱到车站,司机答应了。而此时跟我同时等公交的另外一个陌生人也过来问我们要到哪里去,当他得知目的地是火车站以后,表示也想搭车,这时候司机坐地起价要他加10块钱。这位陌生人想想觉得可以,就加了10块钱给他。设想一下,如果我在拦的士之前就知道这个陌生人也想去火车站,两人决定一起打车最后的价钱是怎样的结果?也许15-20块钱就可以搞定问题,而不是最终的25块。而事实上,如果大家都具有这样的能力,我想对的士司机来说也可以增长盈利,因为更多的打车需求会让他们的单数变多从而增加总的流水。

回到某拼车公司的话题,目前假设从阿里巴巴西溪总部出发到杭州东站(路径A),一个人打车的费用是100,那么第二个人拼车也是到杭州东站附近(路径B),这时候他可能需要付的价钱是90块钱,也就是说总价190块钱。大家是否认为司机会拿到这部分的差额呢?事实上,的士司机只拿到了他们共享路程的费用((A /union B)* 20%),而不是((A + B) * 20%),如果A和B完全相等的话,那么司机基本上不会拿到更多的钱,这部分多出来的利润就被某拼车公司完全拿去了。 为什么某拼车公司会这么做而且敢这么做呢?因为他们不但垄断了共享车的平台,也垄断了信息分享的平台,一个人在上车之前他是不知道另外一个人跟在类似的时间段去类似的地方的。如若这两个人在上车前就已经知道了对方的目的地,并联合起来打一辆车的话,那么这个博弈的格局就完全不同了。我们写此文的目的就是要分析真实世界中这样的需求是否真实存在,值不值得我们投入精力去开发或者利用一个已有的信息平台让有类似出行需求的人在按下打车”的按钮前就找到对方,从而增加议价的权利。

本文使用的数据来自于Todd Scheider维护的纽约的士数据[1],在此文中只分析Yellow Cab的数据,因为其时间跨度较长(2009-2016),同时覆盖纽约市区的范围也更广(所有纽约5个大区)。使用的阿里云大数据的技术有:MaxCompute的Tunnel,Sql,UDF,MapReduce,Graph和quick BI。实验机为阿里云的ECS最低配的机器。所有开发实验工作均在公有云上进行。本文的结构如下:第二节将介绍数据分析的技术细节,第三节为实验结果分析。

技术方案

数据导入

首先我们将csv格式的数据使用Tunnel导入到ODPS表中,使用的表的schema如下:

create table nyc_taxi_raw_small (vid                   bigint,
                                 vendor_name           string,
                                 Trip_Pickup_DateTime  string,
                                 Trip_Dropoff_DateTime string,
                                 Passenger_Count       string,
                                 Trip_Distance         double,
                                 Start_Lon             double,
                                 Start_Lat             double,
                                 Rate_Code             string,
                                 store_and_forward     string,
                                 End_Lon               double,
                                 End_Lat               double,
                                 Payment_Type          string,
                                 Fare_Amt              double,
                                 surcharge             double,
                                 mta_tax               double,
                                 Tip_Amt               double,
                                 Tolls_Amt             double,
                                 Total_Amt             double);

我们需要给每一个事件指定一个唯一的ID,用于后续的图分析,唯一ID的指定我们可以引用用[2]中的技术,但是当数据量比较大时,这个方法无法保证ID的唯一性,经过一系列调研后,发现这个ID生成在ODPS中是一个比较难的问题,所以我选择在tunnel导入之前就计算好每个记录的ID,使用的Tunnel导入的script如下:

touch ../data/experiments/counts.txt
for YEAR in 2009 2010 2011 2012 2013 2014 2015 2016
do
    for MONTH in 01 02 03 04 05 06 07 08 09 10 11 12
    do
        wget https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_${YEAR}-${MONTH}.csv -O /home/zhaoming/NYC_TAXI_DATA/data/experiments/yellow_tripdata_${YEAR}-${MONTH}.csv
        python ../python/add_vid.py /home/zhaoming/NYC_TAXI_DATA/data/experiments/yellow_tripdata_${YEAR}-${MONTH}.csv /home/zhaoming/NYC_TAXI_DATA/data/experiments/counts.txt
        /home/zhaoming/odps_console/bin/odpscmd -e "tunnel upload /home/zhaoming/NYC_TAXI_DATA/data/experiments/yellow_tripdata_${YEAR}-${MONTH}.csv.out nyc_taxi_raw_small -dbr true;"
        rm /home/zhaoming/NYC_TAXI_DATA/data/experiments/yellow_tripdata_${YEAR}-${MONTH}.csv
        rm /home/zhaoming/NYC_TAXI_DATA/data/experiments/yellow_tripdata_${YEAR}-${MONTH}.csv.out
    done
done

计算vid的python代码如下:

import sys

class VidAppender:

    def __init__(self, infile, cntfile):
        self.infile  = infile
        self.oufile  = infile + ".out"
        self.cntfile = cntfile
        reader = open(cntfile, "r")
        self.count  = 0
        for line in reader:
            self.count += int(line.replace("\n", ""))

    def process(self):
        reader = open(self.infile, "r")
        writer = open(self.oufile, "w")
        cnt = 0
        for line in reader:
            try:
                if line.startswith("vendor_name") or line.startswith("^M") or line.startswith("VendorID"):
                    writer.write("vid,"+line)
                    continue
                writer.write(str(self.count)+","+line)
                cnt += 1
                self.count += 1
            except Exception,e:
                print Exception,":",e
                print line
                pass
        reader.close()
        writer.close()
        cntwriter = open(self.cntfile, "a")
        cntwriter.write(str(cnt)+"\n")
        cntwriter.close()

def main(argv):
    appender = VidAppender(argv[1], argv[2])
    appender.process()

if __name__ == "__main__":
    main(sys.argv)

这样我们得到的数据一共有:866,796,462条数据。       

数据清洗和图生成

首先我们要建立一张表来存储需要被计算的内容:

create table nyc_taxi_data (vid                   bigint, 
                            trip_pickup_datetime  string, 
                            trip_dropoff_datetime string, 
                            start_lon             double, 
                            start_lat             double, 
                            end_lon               double, 
                            end_lat               double);

并且将数据注入:

insert overwrite table nyc_taxi_data 
    select vid, 
           trip_pickup_datetime, 
           trip_dropoff_datetime, 
           start_lon,
           start_lat, 
           end_lon, 
           end_lat 
from nyc_taxi_raw_small;

我们使用一个点(Vertex)来表示一个打车事件,假设两个打车事件之间的起始时间在100秒内,起始距离在200米内,终点距离在500米内,我们认为这两个打车事件具备拼车的可能性(这个标准可以调整,但是我认为这个标准已经比较严格)。那么我们用一条边(Edge)将这两个点连接起来,将所有可能拼车的点用边相连,我们便得到了一个图(Graph)。图的schema如下所示:

 

生成图和进行数据清理我们使用MapReduce来进行,Mapper和Reducer的代码如下显示:

public void map(long recordNum, Record record, TaskContext context) throws IOException {
		String pickup = record.getString(1);
		String keyStr = pickup.split(" ")[0] + "-" + pickup.split(" ")[1].split(":")[0];
		key.set("pt", keyStr);
		value.set("time", getTS(pickup));
		value.set("vid", record.getBigint(0));
		value.set("start_lon", record.getDouble(3));
		value.set("start_lat", record.getDouble(4));
		value.set("end_lon", record.getDouble(5));
		value.set("end_lat", record.getDouble(6));
		try {
			if (record.getDouble(3) != 0 &&
			    record.getDouble(4) != 0 &&
			    record.getDouble(5) != 0 &&
			    record.getDouble(6) != 0) {
				context.write(key, value);
			}
		} catch (NullPointerException e) {
			System.out.println("record is broken!");
		}
	}
Mapper做的事情很简单,就是生成一个以小时为单位的Key,进行数据清洗(坐标值不能为0)同时将时间转换成为Time Stamp。
public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
		Vector<Event> v = new Vector<Event>();
		HashMap<Long, Vector<Long>> hmap = new HashMap<Long, Vector<Long>>();
		while (values.hasNext()) {
			Record r = values.next();
			Event e = new Event(r.getBigint("vid"), r.getBigint("time"), r.getDouble("start_lon"),
					r.getDouble("start_lat"), r.getDouble("end_lon"), r.getDouble("end_lat"));
			v.add(e);
		}

		Collections.sort(v);

		for (int i = 0; i < v.size(); i++) {
			for (int j = i + 1; j < v.size(); j++) {
				long time_diff = cal_time(v.get(i), v.get(j));
				double from_dist = cal_from_dist(v.get(i), v.get(j));
				double to_dist = cal_to_dist(v.get(i), v.get(j));
				if (time_diff < 100) {
					if (from_dist < 200 && to_dist < 500) {
						long from = v.get(i).vid;
						long to = v.get(j).vid;
						if (hmap.get(from) == null)
							hmap.put(from, new Vector<Long>());
						if (hmap.get(to) == null)
							hmap.put(to, new Vector<Long>());
						hmap.get(from).add(to);
						hmap.get(to).add(from);
					}
				} else {
					break;
				}
			}
		}

		Set set = hmap.entrySet();
		Iterator iterator = set.iterator();
		while (iterator.hasNext()) {
			String ret = "";
			Map.Entry mentry = (Map.Entry) iterator.next();
			Vector<Long> tmp = hmap.get(mentry.getKey());
			for (int j = 0; j < tmp.size(); j++) {
				if (j != tmp.size() - 1) {
					ret += Long.toString(tmp.get(j)) + ":1,";
				} else
					ret += Long.toString(tmp.get(j)) + ":1";
			}
			output.set(0, mentry.getKey());
			output.set(1, ret);
			context.write(output);
		}
	}

Reducer则负责将本Key内的所有数据进行距离计算(起始时间,起始位置,终点位置),并输出可连接的点。这里面为了提升效率,我们将数据按照时间进行排序,超过时间范围的则不计算,事实上可以提升效率的方法有很多种,比如说使用R-Tree等等 [3]。将代码打包并进行计算的odps指令如下(注意这段代码是可以指定执行的mapper reducer个数的):

create resource jar /Users/stplaydog/gitlocal/odps_clt/jars/prepare_graph.jar -f;
jar -resources prepare_graph.jar,log4j-1.2.17.jar,rt.jar -classpath /Users/stplaydog/gitlocal/odps_clt/jars/prepare_graph.jar NYCTaxiDataTransform/NYCTaxiDataTransform nyc_taxi_data nyc_taxi_graph 256;

经过计算,一共有497,819,232个点和1,373,388,220条边,也就是说有这么多个打车事件与其它事件有拼车可能性。

拼车可能性计算

当两人拼车时,我们使用边即可以表达这个关系,三人拼车时三角形可以进行计算。但是这里面存在着一个问题,就是当一个点已经被算到属于某条边的拼车事件中去时,那么其在其它边上的拼车事件就不能被计算(我们在这里使用的策略是只有小id的点负责计算边)。对于三角形的计算也应该同样遵循这样的规则。首先计算边的算法我们叫做IndependentEdgeCount,其table schema为:
create table nyc_taxi_independent_edge(vid bigint, count bigint);
计算边数量的核心算法为:
public void compute(ComputeContext<LongWritable, Tuple, NullWritable, Tuple> context, Iterable<Tuple> msgs)
				throws IOException {
			if (context.getSuperstep() == 0L) {
				// sends a message with its ID to all its outgoing neighbors
				Tuple t = new Tuple();
				t.append(getId());
				context.sendMessageToNeighbors(this, t);
				
				boolean hasLess = false;
				int count = 0;
				for(int i=0; i<this.getValue().getAll().size();i++)
				{
					if(Long.parseLong(this.getValue().get(i).toString())<Long.parseLong(this.getId().toString()))
						hasLess = true;
				}
				if(!hasLess && getValue().getAll().size() != 0)
					count = 1;
				context.write(getId(), new LongWritable(count));
				this.voteToHalt();
					
			}
		}
用来保存三角形计数的table的schema为:
create table nyc_taxi_triangle(vid bigint, count bigint);
三角形的计算我们使用ODPS标准的例子,具体见[4],但是因为不能重复将已经计算的三角形作为拼车的例子,所以我们需要将算法进行改进,计算过的三角形不再列入进一步的计算中,同时因为我们使用的图为无向图,所以相比较[4]的例子,我们只需要两轮迭代,经过改进后的算法代码如下:
		public void compute(ComputeContext<LongWritable, Tuple, NullWritable, Tuple> context, Iterable<Tuple> msgs)
				throws IOException {

			if (context.getSuperstep() == 0L) {
				this.getValue().append(this.getId());
				context.sendMessageToNeighbors(this, getValue());
			} else if (context.getSuperstep() == 1L) {
				long my_v = Long.parseLong(this.getId().toString());
				int count = 0;
				for (Tuple msg : msgs) {
					long from_v = Long.parseLong(msg.getAll().get(msg.getAll().size() - 1).toString());
					for (int i = 0; i < msg.getAll().size() - 1; i++) {
						long inter_v = Long.parseLong(msg.getAll().get(i).toString());
						if (!this.getValue().getAll().contains((LongWritable) msg.getAll().get(i)) && my_v < from_v
								&& my_v < inter_v) {
							count = 1;
						}
					}
				}
				context.write(getId(), new LongWritable(count));
				this.voteToHalt();
			}
		}
计算独立三角形(边数)的odps命令为:
jar -resources prepare_graph.jar,log4j-1.2.17.jar,rt.jar -classpath /Users/stplaydog/gitlocal/odps_clt/jars/prepare_graph.jar NYCTaxiDataGraphAnalysis/TriangleCount nyc_taxi_graph nyc_taxi_triangle;

数据分析

我们找到的独立边的个数为 168,841,988,独立三角形的个数为 102,091,976,这样可以推论在现有的拼车标准下,可拼车倾向的比例为:
两人拼车: 168,841,988*2/ 866,796,462 = 38.96%
我们想要分析具体在哪个时间段的拼车需求比较多,那么先需要把这个独立边和独立三角形的信息映射回原表上,具体使用JOIN操作:
create table nyc_taxi_data_join_independent_edge 
like nyc_taxi_data;

insert overwrite table nyc_taxi_data_join_independent_edge 
select a.vid, 
       a.trip_pickup_datetime, 
       a.trip_dropoff_datetime, 
       a.start_lon, 
       a.start_lat, 
       a.end_lon, 
       a.end_lat 
from nyc_taxi_data a 
INNER JOIN 
(select * from nyc_taxi_independent_edge where count != 0) b 
on a.vid = b.vid;
在quick BI上建立一个SQL数据源:
select SUBSTR(trip_pickup_datetime, 12, 2) hour, 1 cnt
 from odps_zhaoming.nyc_taxi_data_join_independent_edge
得到的图表如下:
8c5164c1ac8ef1a22d60fc0589708d3f7155a71d
三人拼车: 102,091,976*3/ 866,796,462 = 35.33%
使用同样的流程获得的图为:
3a3039c4be8fb862601c89cbbfdf54ea2009f4b5
可以看到,两人和三人拼车基本遵循类似的规律,就是在晚上7,8,9点时下班时左右达到高峰,不同的是,三人拼车在早上7,8,9点左右会有一个与下午类似的高峰。


引用

[1] https://github.com/toddwschneider/nyc-taxi-data 

[2] MaxCompute SQL Row_Sequence 实现列自增长 https://yq.aliyun.com/articles/118901?spm=5176.8091938.0.0.CxYtZS

[3] Guttman, A. (1984). "R-Trees: A Dynamic Index Structure for Spatial Searching". 

[4] 开放数据处理服务ODPS Graph用户指南

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
23天前
|
存储 大数据 测试技术
用于大数据分析的数据存储格式:Parquet、Avro 和 ORC 的性能和成本影响
在大数据环境中,数据存储格式直接影响查询性能和成本。本文探讨了 Parquet、Avro 和 ORC 三种格式在 Google Cloud Platform (GCP) 上的表现。Parquet 和 ORC 作为列式存储格式,在压缩和读取效率方面表现优异,尤其适合分析工作负载;Avro 则适用于需要快速写入和架构演化的场景。通过对不同查询类型(如 SELECT、过滤、聚合和联接)的基准测试,本文提供了在各种使用案例中选择最优存储格式的建议。研究结果显示,Parquet 和 ORC 在读取密集型任务中更高效,而 Avro 更适合写入密集型任务。正确选择存储格式有助于显著降低成本并提升查询性能。
124 1
用于大数据分析的数据存储格式:Parquet、Avro 和 ORC 的性能和成本影响
|
2月前
|
分布式计算 Hadoop 大数据
Jupyter 在大数据分析中的角色
【8月更文第29天】Jupyter Notebook 提供了一个交互式的开发环境,它不仅适用于 Python 编程语言,还能够支持其他语言,包括 Scala 和 R 等。这种多语言的支持使得 Jupyter 成为大数据分析领域中非常有价值的工具,特别是在与 Apache Spark 和 Hadoop 等大数据框架集成方面。本文将探讨 Jupyter 如何支持这些大数据框架进行高效的数据处理和分析,并提供具体的代码示例。
49 0
|
7天前
|
JSON 数据可视化 数据挖掘
Polars函数合集大全:大数据分析的新利器
Polars函数合集大全:大数据分析的新利器
15 1
|
20天前
|
存储 分布式计算 Hadoop
大数据分析的工具
大数据是一个含义广泛的术语,是指数据集,如此庞大而复杂的,他们需要专门设计的硬件和软件工具进行处理。该数据集通常是万亿或EB的大小。这些数据集收集自各种各样的来源:传感器,气候信息,公开的信息,如杂志,报纸,文章。大数据产生的其他例子包括购买交易记录,网络日志,病历,军事监控,视频和图像档案,及大型电子商务。
27 8
|
1月前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
82 11
|
2月前
|
分布式计算 数据可视化 大数据
Vaex :突破pandas,快速分析100GB大数据集
Vaex :突破pandas,快速分析100GB大数据集
|
2月前
|
大数据 机器人 数据挖掘
这个云ETL工具配合Python轻松实现大数据集分析,附案例
这个云ETL工具配合Python轻松实现大数据集分析,附案例
|
2月前
|
数据采集 人工智能 安全
AI大数据处理与分析实战--体育问卷分析
本文是关于使用AI进行大数据处理与分析的实战案例,详细记录了对深圳市义务教育阶段学校“每天一节体育课”网络问卷的分析过程,包括数据概览、交互Prompt、代码处理、年级和学校维度的深入分析,以及通过AI工具辅助得出的分析结果和结论。
|
2月前
|
消息中间件 前端开发 安全
第三方数据平台技术选型分析
这篇文章分析了第三方数据平台的技术选型,涵盖了移动统计平台、自助分析平台和BI平台的不同代表厂商,讨论了它们的数据源、使用要求和适用场景。
40 2
|
2月前
|
存储 JSON 关系型数据库
MySQL与JSON的邂逅:开启大数据分析新纪元
MySQL与JSON的邂逅:开启大数据分析新纪元

热门文章

最新文章

下一篇
无影云桌面