使用MaxCompute进行纽约的士拼车分析

前言

最近几年以来出现的共享的士(Uber,Lyft,滴滴)给人们的出行带来了极大的便利。随着烧钱大战的结束,中美市场大局已定,为了维持高估值(Uber 80 Billion $, 滴滴30 Billion $),缩减亏损,增长净利润,继而进入上市流程,几大公司都开始发掘盈利的规则。带来的影响是,共享出行的用户们发现:1)车越来越难打,价钱越来越高;2)使用拼车会大幅度增长时间损耗,而带来的金钱节约却并不明显;3)司机发现盈利有限,真正愿意开车的司机越来越少,某拼车公司正在慢慢的转变成为一个的士信息服务平台。这与其做成让用户通过手机便捷,实惠出行的愿景大相径庭。

我想说一个故事,作为这篇文章所要解决的的一个问题的引子。那天天气炎热,我正在公交等去高铁站的公共汽车,由于太热,我决定打一辆车,当我拦下来一辆车后跟司机说15块钱到车站,司机答应了。而此时跟我同时等公交的另外一个陌生人也过来问我们要到哪里去,当他得知目的地是火车站以后,表示也想搭车,这时候司机坐地起价要他加10块钱。这位陌生人想想觉得可以,就加了10块钱给他。设想一下,如果我在拦的士之前就知道这个陌生人也想去火车站,两人决定一起打车最后的价钱是怎样的结果?也许15-20块钱就可以搞定问题,而不是最终的25块。而事实上,如果大家都具有这样的能力,我想对的士司机来说也可以增长盈利,因为更多的打车需求会让他们的单数变多从而增加总的流水。

回到某拼车公司的话题,目前假设从阿里巴巴西溪总部出发到杭州东站(路径A),一个人打车的费用是100,那么第二个人拼车也是到杭州东站附近(路径B),这时候他可能需要付的价钱是90块钱,也就是说总价190块钱。大家是否认为司机会拿到这部分的差额呢?事实上,的士司机只拿到了他们共享路程的费用((A /union B)* 20%),而不是((A + B) * 20%),如果A和B完全相等的话,那么司机基本上不会拿到更多的钱,这部分多出来的利润就被某拼车公司完全拿去了。 为什么某拼车公司会这么做而且敢这么做呢?因为他们不但垄断了共享车的平台,也垄断了信息分享的平台,一个人在上车之前他是不知道另外一个人跟在类似的时间段去类似的地方的。如若这两个人在上车前就已经知道了对方的目的地,并联合起来打一辆车的话,那么这个博弈的格局就完全不同了。我们写此文的目的就是要分析真实世界中这样的需求是否真实存在,值不值得我们投入精力去开发或者利用一个已有的信息平台让有类似出行需求的人在按下打车”的按钮前就找到对方,从而增加议价的权利。

本文使用的数据来自于Todd Scheider维护的纽约的士数据[1],在此文中只分析Yellow Cab的数据,因为其时间跨度较长(2009-2016),同时覆盖纽约市区的范围也更广(所有纽约5个大区)。使用的阿里云大数据的技术有:MaxCompute的Tunnel,Sql,UDF,MapReduce,Graph和quick BI。实验机为阿里云的ECS最低配的机器。所有开发实验工作均在公有云上进行。本文的结构如下:第二节将介绍数据分析的技术细节,第三节为实验结果分析。

技术方案

数据导入

首先我们将csv格式的数据使用Tunnel导入到ODPS表中,使用的表的schema如下:

create table nyc_taxi_raw_small (vid                   bigint,
                                 vendor_name           string,
                                 Trip_Pickup_DateTime  string,
                                 Trip_Dropoff_DateTime string,
                                 Passenger_Count       string,
                                 Trip_Distance         double,
                                 Start_Lon             double,
                                 Start_Lat             double,
                                 Rate_Code             string,
                                 store_and_forward     string,
                                 End_Lon               double,
                                 End_Lat               double,
                                 Payment_Type          string,
                                 Fare_Amt              double,
                                 surcharge             double,
                                 mta_tax               double,
                                 Tip_Amt               double,
                                 Tolls_Amt             double,
                                 Total_Amt             double);

我们需要给每一个事件指定一个唯一的ID,用于后续的图分析,唯一ID的指定我们可以引用用[2]中的技术,但是当数据量比较大时,这个方法无法保证ID的唯一性,经过一系列调研后,发现这个ID生成在ODPS中是一个比较难的问题,所以我选择在tunnel导入之前就计算好每个记录的ID,使用的Tunnel导入的script如下:

touch ../data/experiments/counts.txt
for YEAR in 2009 2010 2011 2012 2013 2014 2015 2016
do
    for MONTH in 01 02 03 04 05 06 07 08 09 10 11 12
    do
        wget https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_${YEAR}-${MONTH}.csv -O /home/zhaoming/NYC_TAXI_DATA/data/experiments/yellow_tripdata_${YEAR}-${MONTH}.csv
        python ../python/add_vid.py /home/zhaoming/NYC_TAXI_DATA/data/experiments/yellow_tripdata_${YEAR}-${MONTH}.csv /home/zhaoming/NYC_TAXI_DATA/data/experiments/counts.txt
        /home/zhaoming/odps_console/bin/odpscmd -e "tunnel upload /home/zhaoming/NYC_TAXI_DATA/data/experiments/yellow_tripdata_${YEAR}-${MONTH}.csv.out nyc_taxi_raw_small -dbr true;"
        rm /home/zhaoming/NYC_TAXI_DATA/data/experiments/yellow_tripdata_${YEAR}-${MONTH}.csv
        rm /home/zhaoming/NYC_TAXI_DATA/data/experiments/yellow_tripdata_${YEAR}-${MONTH}.csv.out
    done
done

计算vid的python代码如下:

import sys

class VidAppender:

    def __init__(self, infile, cntfile):
        self.infile  = infile
        self.oufile  = infile + ".out"
        self.cntfile = cntfile
        reader = open(cntfile, "r")
        self.count  = 0
        for line in reader:
            self.count += int(line.replace("\n", ""))

    def process(self):
        reader = open(self.infile, "r")
        writer = open(self.oufile, "w")
        cnt = 0
        for line in reader:
            try:
                if line.startswith("vendor_name") or line.startswith("^M") or line.startswith("VendorID"):
                    writer.write("vid,"+line)
                    continue
                writer.write(str(self.count)+","+line)
                cnt += 1
                self.count += 1
            except Exception,e:
                print Exception,":",e
                print line
                pass
        reader.close()
        writer.close()
        cntwriter = open(self.cntfile, "a")
        cntwriter.write(str(cnt)+"\n")
        cntwriter.close()

def main(argv):
    appender = VidAppender(argv[1], argv[2])
    appender.process()

if __name__ == "__main__":
    main(sys.argv)

这样我们得到的数据一共有:866,796,462条数据。       

数据清洗和图生成

首先我们要建立一张表来存储需要被计算的内容:

create table nyc_taxi_data (vid                   bigint, 
                            trip_pickup_datetime  string, 
                            trip_dropoff_datetime string, 
                            start_lon             double, 
                            start_lat             double, 
                            end_lon               double, 
                            end_lat               double);

并且将数据注入:

insert overwrite table nyc_taxi_data 
    select vid, 
           trip_pickup_datetime, 
           trip_dropoff_datetime, 
           start_lon,
           start_lat, 
           end_lon, 
           end_lat 
from nyc_taxi_raw_small;

我们使用一个点(Vertex)来表示一个打车事件,假设两个打车事件之间的起始时间在100秒内,起始距离在200米内,终点距离在500米内,我们认为这两个打车事件具备拼车的可能性(这个标准可以调整,但是我认为这个标准已经比较严格)。那么我们用一条边(Edge)将这两个点连接起来,将所有可能拼车的点用边相连,我们便得到了一个图(Graph)。图的schema如下所示:


生成图和进行数据清理我们使用MapReduce来进行,Mapper和Reducer的代码如下显示:

public void map(long recordNum, Record record, TaskContext context) throws IOException {
		String pickup = record.getString(1);
		String keyStr = pickup.split(" ")[0] + "-" + pickup.split(" ")[1].split(":")[0];
		key.set("pt", keyStr);
		value.set("time", getTS(pickup));
		value.set("vid", record.getBigint(0));
		value.set("start_lon", record.getDouble(3));
		value.set("start_lat", record.getDouble(4));
		value.set("end_lon", record.getDouble(5));
		value.set("end_lat", record.getDouble(6));
		try {
			if (record.getDouble(3) != 0 &&
			    record.getDouble(4) != 0 &&
			    record.getDouble(5) != 0 &&
			    record.getDouble(6) != 0) {
				context.write(key, value);
			}
		} catch (NullPointerException e) {
			System.out.println("record is broken!");
		}
	}
Mapper做的事情很简单,就是生成一个以小时为单位的Key,进行数据清洗(坐标值不能为0)同时将时间转换成为Time Stamp。
public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
		Vector<Event> v = new Vector<Event>();
		HashMap<Long, Vector<Long>> hmap = new HashMap<Long, Vector<Long>>();
		while (values.hasNext()) {
			Record r = values.next();
			Event e = new Event(r.getBigint("vid"), r.getBigint("time"), r.getDouble("start_lon"),
					r.getDouble("start_lat"), r.getDouble("end_lon"), r.getDouble("end_lat"));
			v.add(e);
		}

		Collections.sort(v);

		for (int i = 0; i < v.size(); i++) {
			for (int j = i + 1; j < v.size(); j++) {
				long time_diff = cal_time(v.get(i), v.get(j));
				double from_dist = cal_from_dist(v.get(i), v.get(j));
				double to_dist = cal_to_dist(v.get(i), v.get(j));
				if (time_diff < 100) {
					if (from_dist < 200 && to_dist < 500) {
						long from = v.get(i).vid;
						long to = v.get(j).vid;
						if (hmap.get(from) == null)
							hmap.put(from, new Vector<Long>());
						if (hmap.get(to) == null)
							hmap.put(to, new Vector<Long>());
						hmap.get(from).add(to);
						hmap.get(to).add(from);
					}
				} else {
					break;
				}
			}
		}

		Set set = hmap.entrySet();
		Iterator iterator = set.iterator();
		while (iterator.hasNext()) {
			String ret = "";
			Map.Entry mentry = (Map.Entry) iterator.next();
			Vector<Long> tmp = hmap.get(mentry.getKey());
			for (int j = 0; j < tmp.size(); j++) {
				if (j != tmp.size() - 1) {
					ret += Long.toString(tmp.get(j)) + ":1,";
				} else
					ret += Long.toString(tmp.get(j)) + ":1";
			}
			output.set(0, mentry.getKey());
			output.set(1, ret);
			context.write(output);
		}
	}

Reducer则负责将本Key内的所有数据进行距离计算(起始时间,起始位置,终点位置),并输出可连接的点。这里面为了提升效率,我们将数据按照时间进行排序,超过时间范围的则不计算,事实上可以提升效率的方法有很多种,比如说使用R-Tree等等 [3]。将代码打包并进行计算的odps指令如下(注意这段代码是可以指定执行的mapper reducer个数的):

create resource jar /Users/stplaydog/gitlocal/odps_clt/jars/prepare_graph.jar -f;
jar -resources prepare_graph.jar,log4j-1.2.17.jar,rt.jar -classpath /Users/stplaydog/gitlocal/odps_clt/jars/prepare_graph.jar NYCTaxiDataTransform/NYCTaxiDataTransform nyc_taxi_data nyc_taxi_graph 256;

经过计算,一共有497,819,232个点和1,373,388,220条边,也就是说有这么多个打车事件与其它事件有拼车可能性。

拼车可能性计算

当两人拼车时,我们使用边即可以表达这个关系,三人拼车时三角形可以进行计算。但是这里面存在着一个问题,就是当一个点已经被算到属于某条边的拼车事件中去时,那么其在其它边上的拼车事件就不能被计算(我们在这里使用的策略是只有小id的点负责计算边)。对于三角形的计算也应该同样遵循这样的规则。首先计算边的算法我们叫做IndependentEdgeCount,其table schema为:
create table nyc_taxi_independent_edge(vid bigint, count bigint);
计算边数量的核心算法为:
public void compute(ComputeContext<LongWritable, Tuple, NullWritable, Tuple> context, Iterable<Tuple> msgs)
				throws IOException {
			if (context.getSuperstep() == 0L) {
				// sends a message with its ID to all its outgoing neighbors
				Tuple t = new Tuple();
				t.append(getId());
				context.sendMessageToNeighbors(this, t);
				
				boolean hasLess = false;
				int count = 0;
				for(int i=0; i<this.getValue().getAll().size();i++)
				{
					if(Long.parseLong(this.getValue().get(i).toString())<Long.parseLong(this.getId().toString()))
						hasLess = true;
				}
				if(!hasLess && getValue().getAll().size() != 0)
					count = 1;
				context.write(getId(), new LongWritable(count));
				this.voteToHalt();
					
			}
		}
用来保存三角形计数的table的schema为:
create table nyc_taxi_triangle(vid bigint, count bigint);
三角形的计算我们使用ODPS标准的例子,具体见[4],但是因为不能重复将已经计算的三角形作为拼车的例子,所以我们需要将算法进行改进,计算过的三角形不再列入进一步的计算中,同时因为我们使用的图为无向图,所以相比较[4]的例子,我们只需要两轮迭代,经过改进后的算法代码如下:
		public void compute(ComputeContext<LongWritable, Tuple, NullWritable, Tuple> context, Iterable<Tuple> msgs)
				throws IOException {

			if (context.getSuperstep() == 0L) {
				this.getValue().append(this.getId());
				context.sendMessageToNeighbors(this, getValue());
			} else if (context.getSuperstep() == 1L) {
				long my_v = Long.parseLong(this.getId().toString());
				int count = 0;
				for (Tuple msg : msgs) {
					long from_v = Long.parseLong(msg.getAll().get(msg.getAll().size() - 1).toString());
					for (int i = 0; i < msg.getAll().size() - 1; i++) {
						long inter_v = Long.parseLong(msg.getAll().get(i).toString());
						if (!this.getValue().getAll().contains((LongWritable) msg.getAll().get(i)) && my_v < from_v
								&& my_v < inter_v) {
							count = 1;
						}
					}
				}
				context.write(getId(), new LongWritable(count));
				this.voteToHalt();
			}
		}
计算独立三角形(边数)的odps命令为:
jar -resources prepare_graph.jar,log4j-1.2.17.jar,rt.jar -classpath /Users/stplaydog/gitlocal/odps_clt/jars/prepare_graph.jar NYCTaxiDataGraphAnalysis/TriangleCount nyc_taxi_graph nyc_taxi_triangle;

数据分析

我们找到的独立边的个数为168,841,988,独立三角形的个数为102,091,976,这样可以推论在现有的拼车标准下,可拼车倾向的比例为:
两人拼车:168,841,988*2/866,796,462 = 38.96%
我们想要分析具体在哪个时间段的拼车需求比较多,那么先需要把这个独立边和独立三角形的信息映射回原表上,具体使用JOIN操作:
create table nyc_taxi_data_join_independent_edge 
like nyc_taxi_data;

insert overwrite table nyc_taxi_data_join_independent_edge 
select a.vid, 
       a.trip_pickup_datetime, 
       a.trip_dropoff_datetime, 
       a.start_lon, 
       a.start_lat, 
       a.end_lon, 
       a.end_lat 
from nyc_taxi_data a 
INNER JOIN 
(select * from nyc_taxi_independent_edge where count != 0) b 
on a.vid = b.vid;
在quick BI上建立一个SQL数据源:
select SUBSTR(trip_pickup_datetime, 12, 2) hour, 1 cnt
 from odps_zhaoming.nyc_taxi_data_join_independent_edge
得到的图表如下:
使用MaxCompute进行纽约的士拼车分析
三人拼车:102,091,976*3/866,796,462 = 35.33%
使用同样的流程获得的图为:
使用MaxCompute进行纽约的士拼车分析
可以看到,两人和三人拼车基本遵循类似的规律,就是在晚上7,8,9点时下班时左右达到高峰,不同的是,三人拼车在早上7,8,9点左右会有一个与下午类似的高峰。


引用

[1] https://github.com/toddwschneider/nyc-taxi-data 

[2] MaxCompute SQL Row_Sequence 实现列自增长 https://yq.aliyun.com/articles/118901?spm=5176.8091938.0.0.CxYtZS

[3] Guttman, A. (1984). "R-Trees: A Dynamic Index Structure for Spatial Searching". 

[4] 开放数据处理服务ODPS Graph用户指南

上一篇:持续定义Saas模式云数据仓库+BI


下一篇:认识Linux下的硬盘分区