前言
最近几年以来出现的共享的士(Uber,Lyft,滴滴)给人们的出行带来了极大的便利。随着烧钱大战的结束,中美市场大局已定,为了维持高估值(Uber 80 Billion $, 滴滴30 Billion $),缩减亏损,增长净利润,继而进入上市流程,几大公司都开始发掘盈利的规则。带来的影响是,共享出行的用户们发现:1)车越来越难打,价钱越来越高;2)使用拼车会大幅度增长时间损耗,而带来的金钱节约却并不明显;3)司机发现盈利有限,真正愿意开车的司机越来越少,某拼车公司正在慢慢的转变成为一个的士信息服务平台。这与其做成让用户通过手机便捷,实惠出行的愿景大相径庭。
我想说一个故事,作为这篇文章所要解决的的一个问题的引子。那天天气炎热,我正在公交等去高铁站的公共汽车,由于太热,我决定打一辆车,当我拦下来一辆车后跟司机说15块钱到车站,司机答应了。而此时跟我同时等公交的另外一个陌生人也过来问我们要到哪里去,当他得知目的地是火车站以后,表示也想搭车,这时候司机坐地起价要他加10块钱。这位陌生人想想觉得可以,就加了10块钱给他。设想一下,如果我在拦的士之前就知道这个陌生人也想去火车站,两人决定一起打车最后的价钱是怎样的结果?也许15-20块钱就可以搞定问题,而不是最终的25块。而事实上,如果大家都具有这样的能力,我想对的士司机来说也可以增长盈利,因为更多的打车需求会让他们的单数变多从而增加总的流水。
回到某拼车公司的话题,目前假设从阿里巴巴西溪总部出发到杭州东站(路径A),一个人打车的费用是100,那么第二个人拼车也是到杭州东站附近(路径B),这时候他可能需要付的价钱是90块钱,也就是说总价190块钱。大家是否认为司机会拿到这部分的差额呢?事实上,的士司机只拿到了他们共享路程的费用((A /union B)* 20%),而不是((A + B) * 20%),如果A和B完全相等的话,那么司机基本上不会拿到更多的钱,这部分多出来的利润就被某拼车公司完全拿去了。 为什么某拼车公司会这么做而且敢这么做呢?因为他们不但垄断了共享车的平台,也垄断了信息分享的平台,一个人在上车之前他是不知道另外一个人跟在类似的时间段去类似的地方的。如若这两个人在上车前就已经知道了对方的目的地,并联合起来打一辆车的话,那么这个博弈的格局就完全不同了。我们写此文的目的就是要分析真实世界中这样的需求是否真实存在,值不值得我们投入精力去开发或者利用一个已有的信息平台让有类似出行需求的人在按下打车”的按钮前就找到对方,从而增加议价的权利。
本文使用的数据来自于Todd Scheider维护的纽约的士数据[1],在此文中只分析Yellow Cab的数据,因为其时间跨度较长(2009-2016),同时覆盖纽约市区的范围也更广(所有纽约5个大区)。使用的阿里云大数据的技术有:MaxCompute的Tunnel,Sql,UDF,MapReduce,Graph和quick BI。实验机为阿里云的ECS最低配的机器。所有开发实验工作均在公有云上进行。本文的结构如下:第二节将介绍数据分析的技术细节,第三节为实验结果分析。
技术方案
数据导入
首先我们将csv格式的数据使用Tunnel导入到ODPS表中,使用的表的schema如下:
create table nyc_taxi_raw_small (vid bigint,
vendor_name string,
Trip_Pickup_DateTime string,
Trip_Dropoff_DateTime string,
Passenger_Count string,
Trip_Distance double,
Start_Lon double,
Start_Lat double,
Rate_Code string,
store_and_forward string,
End_Lon double,
End_Lat double,
Payment_Type string,
Fare_Amt double,
surcharge double,
mta_tax double,
Tip_Amt double,
Tolls_Amt double,
Total_Amt double);
我们需要给每一个事件指定一个唯一的ID,用于后续的图分析,唯一ID的指定我们可以引用用[2]中的技术,但是当数据量比较大时,这个方法无法保证ID的唯一性,经过一系列调研后,发现这个ID生成在ODPS中是一个比较难的问题,所以我选择在tunnel导入之前就计算好每个记录的ID,使用的Tunnel导入的script如下:
touch ../data/experiments/counts.txt
for YEAR in 2009 2010 2011 2012 2013 2014 2015 2016
do
for MONTH in 01 02 03 04 05 06 07 08 09 10 11 12
do
wget https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_${YEAR}-${MONTH}.csv -O /home/zhaoming/NYC_TAXI_DATA/data/experiments/yellow_tripdata_${YEAR}-${MONTH}.csv
python ../python/add_vid.py /home/zhaoming/NYC_TAXI_DATA/data/experiments/yellow_tripdata_${YEAR}-${MONTH}.csv /home/zhaoming/NYC_TAXI_DATA/data/experiments/counts.txt
/home/zhaoming/odps_console/bin/odpscmd -e "tunnel upload /home/zhaoming/NYC_TAXI_DATA/data/experiments/yellow_tripdata_${YEAR}-${MONTH}.csv.out nyc_taxi_raw_small -dbr true;"
rm /home/zhaoming/NYC_TAXI_DATA/data/experiments/yellow_tripdata_${YEAR}-${MONTH}.csv
rm /home/zhaoming/NYC_TAXI_DATA/data/experiments/yellow_tripdata_${YEAR}-${MONTH}.csv.out
done
done
计算vid的python代码如下:
import sys
class VidAppender:
def __init__(self, infile, cntfile):
self.infile = infile
self.oufile = infile + ".out"
self.cntfile = cntfile
reader = open(cntfile, "r")
self.count = 0
for line in reader:
self.count += int(line.replace("\n", ""))
def process(self):
reader = open(self.infile, "r")
writer = open(self.oufile, "w")
cnt = 0
for line in reader:
try:
if line.startswith("vendor_name") or line.startswith("^M") or line.startswith("VendorID"):
writer.write("vid,"+line)
continue
writer.write(str(self.count)+","+line)
cnt += 1
self.count += 1
except Exception,e:
print Exception,":",e
print line
pass
reader.close()
writer.close()
cntwriter = open(self.cntfile, "a")
cntwriter.write(str(cnt)+"\n")
cntwriter.close()
def main(argv):
appender = VidAppender(argv[1], argv[2])
appender.process()
if __name__ == "__main__":
main(sys.argv)
这样我们得到的数据一共有:866,796,462条数据。
数据清洗和图生成
首先我们要建立一张表来存储需要被计算的内容:
create table nyc_taxi_data (vid bigint,
trip_pickup_datetime string,
trip_dropoff_datetime string,
start_lon double,
start_lat double,
end_lon double,
end_lat double);
并且将数据注入:
insert overwrite table nyc_taxi_data
select vid,
trip_pickup_datetime,
trip_dropoff_datetime,
start_lon,
start_lat,
end_lon,
end_lat
from nyc_taxi_raw_small;
我们使用一个点(Vertex)来表示一个打车事件,假设两个打车事件之间的起始时间在100秒内,起始距离在200米内,终点距离在500米内,我们认为这两个打车事件具备拼车的可能性(这个标准可以调整,但是我认为这个标准已经比较严格)。那么我们用一条边(Edge)将这两个点连接起来,将所有可能拼车的点用边相连,我们便得到了一个图(Graph)。图的schema如下所示:
生成图和进行数据清理我们使用MapReduce来进行,Mapper和Reducer的代码如下显示:
public void map(long recordNum, Record record, TaskContext context) throws IOException {
String pickup = record.getString(1);
String keyStr = pickup.split(" ")[0] + "-" + pickup.split(" ")[1].split(":")[0];
key.set("pt", keyStr);
value.set("time", getTS(pickup));
value.set("vid", record.getBigint(0));
value.set("start_lon", record.getDouble(3));
value.set("start_lat", record.getDouble(4));
value.set("end_lon", record.getDouble(5));
value.set("end_lat", record.getDouble(6));
try {
if (record.getDouble(3) != 0 &&
record.getDouble(4) != 0 &&
record.getDouble(5) != 0 &&
record.getDouble(6) != 0) {
context.write(key, value);
}
} catch (NullPointerException e) {
System.out.println("record is broken!");
}
}
Mapper做的事情很简单,就是生成一个以小时为单位的Key,进行数据清洗(坐标值不能为0)同时将时间转换成为Time Stamp。public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
Vector<Event> v = new Vector<Event>();
HashMap<Long, Vector<Long>> hmap = new HashMap<Long, Vector<Long>>();
while (values.hasNext()) {
Record r = values.next();
Event e = new Event(r.getBigint("vid"), r.getBigint("time"), r.getDouble("start_lon"),
r.getDouble("start_lat"), r.getDouble("end_lon"), r.getDouble("end_lat"));
v.add(e);
}
Collections.sort(v);
for (int i = 0; i < v.size(); i++) {
for (int j = i + 1; j < v.size(); j++) {
long time_diff = cal_time(v.get(i), v.get(j));
double from_dist = cal_from_dist(v.get(i), v.get(j));
double to_dist = cal_to_dist(v.get(i), v.get(j));
if (time_diff < 100) {
if (from_dist < 200 && to_dist < 500) {
long from = v.get(i).vid;
long to = v.get(j).vid;
if (hmap.get(from) == null)
hmap.put(from, new Vector<Long>());
if (hmap.get(to) == null)
hmap.put(to, new Vector<Long>());
hmap.get(from).add(to);
hmap.get(to).add(from);
}
} else {
break;
}
}
}
Set set = hmap.entrySet();
Iterator iterator = set.iterator();
while (iterator.hasNext()) {
String ret = "";
Map.Entry mentry = (Map.Entry) iterator.next();
Vector<Long> tmp = hmap.get(mentry.getKey());
for (int j = 0; j < tmp.size(); j++) {
if (j != tmp.size() - 1) {
ret += Long.toString(tmp.get(j)) + ":1,";
} else
ret += Long.toString(tmp.get(j)) + ":1";
}
output.set(0, mentry.getKey());
output.set(1, ret);
context.write(output);
}
}
Reducer则负责将本Key内的所有数据进行距离计算(起始时间,起始位置,终点位置),并输出可连接的点。这里面为了提升效率,我们将数据按照时间进行排序,超过时间范围的则不计算,事实上可以提升效率的方法有很多种,比如说使用R-Tree等等 [3]。将代码打包并进行计算的odps指令如下(注意这段代码是可以指定执行的mapper reducer个数的):
create resource jar /Users/stplaydog/gitlocal/odps_clt/jars/prepare_graph.jar -f;
jar -resources prepare_graph.jar,log4j-1.2.17.jar,rt.jar -classpath /Users/stplaydog/gitlocal/odps_clt/jars/prepare_graph.jar NYCTaxiDataTransform/NYCTaxiDataTransform nyc_taxi_data nyc_taxi_graph 256;
经过计算,一共有497,819,232个点和1,373,388,220条边,也就是说有这么多个打车事件与其它事件有拼车可能性。
拼车可能性计算
create table nyc_taxi_independent_edge(vid bigint, count bigint);
public void compute(ComputeContext<LongWritable, Tuple, NullWritable, Tuple> context, Iterable<Tuple> msgs)
throws IOException {
if (context.getSuperstep() == 0L) {
// sends a message with its ID to all its outgoing neighbors
Tuple t = new Tuple();
t.append(getId());
context.sendMessageToNeighbors(this, t);
boolean hasLess = false;
int count = 0;
for(int i=0; i<this.getValue().getAll().size();i++)
{
if(Long.parseLong(this.getValue().get(i).toString())<Long.parseLong(this.getId().toString()))
hasLess = true;
}
if(!hasLess && getValue().getAll().size() != 0)
count = 1;
context.write(getId(), new LongWritable(count));
this.voteToHalt();
}
}
三角形的计算我们使用ODPS标准的例子,具体见[4],但是因为不能重复将已经计算的三角形作为拼车的例子,所以我们需要将算法进行改进,计算过的三角形不再列入进一步的计算中,同时因为我们使用的图为无向图,所以相比较[4]的例子,我们只需要两轮迭代,经过改进后的算法代码如下:
public void compute(ComputeContext<LongWritable, Tuple, NullWritable, Tuple> context, Iterable<Tuple> msgs)
throws IOException {
if (context.getSuperstep() == 0L) {
this.getValue().append(this.getId());
context.sendMessageToNeighbors(this, getValue());
} else if (context.getSuperstep() == 1L) {
long my_v = Long.parseLong(this.getId().toString());
int count = 0;
for (Tuple msg : msgs) {
long from_v = Long.parseLong(msg.getAll().get(msg.getAll().size() - 1).toString());
for (int i = 0; i < msg.getAll().size() - 1; i++) {
long inter_v = Long.parseLong(msg.getAll().get(i).toString());
if (!this.getValue().getAll().contains((LongWritable) msg.getAll().get(i)) && my_v < from_v
&& my_v < inter_v) {
count = 1;
}
}
}
context.write(getId(), new LongWritable(count));
this.voteToHalt();
}
}
数据分析
create table nyc_taxi_data_join_independent_edge
like nyc_taxi_data;
insert overwrite table nyc_taxi_data_join_independent_edge
select a.vid,
a.trip_pickup_datetime,
a.trip_dropoff_datetime,
a.start_lon,
a.start_lat,
a.end_lon,
a.end_lat
from nyc_taxi_data a
INNER JOIN
(select * from nyc_taxi_independent_edge where count != 0) b
on a.vid = b.vid;
在quick BI上建立一个SQL数据源:select SUBSTR(trip_pickup_datetime, 12, 2) hour, 1 cnt
from odps_zhaoming.nyc_taxi_data_join_independent_edge
得到的图表如下:三人拼车:102,091,976*3/866,796,462 = 35.33%
引用
[1] https://github.com/toddwschneider/nyc-taxi-data
[2] MaxCompute SQL Row_Sequence 实现列自增长 https://yq.aliyun.com/articles/118901?spm=5176.8091938.0.0.CxYtZS
[3] Guttman, A. (1984). "R-Trees: A Dynamic Index Structure for Spatial Searching".