文章目录
Bag:半结构数据的可并行化计算列表
Dask-bag 擅长处理那些,可以表示为无序序列的数据。一般我们将其称为:“混乱” 数据,因为它包括了负载的嵌入结构、缺失域、混合数据类型等。函数式 的编程风格非常适合标准的Python迭代,itertool
就是这方面很好的库。
在数据预处理阶段,特别是读取大量的原始数据时,常会出现“混乱”数据。数据集的原始格式可能是 Json、CSV、XML 或其他结构和数据类型没有严格限制的数据。为此,对于这些数据的表示、读取和处理,通常使用 Python 的列表、集合和字典。
Bag 在上述 Python 数据类型的基础上,对通用存储和处理进行了优化。通过迭代器/生成器表达式或 itertools
或toolz
等库添加流计算,可以让我们在较小的空间中处理大量数据。
应该说,Dask.Bag 是一种集成了抽象数据结构 + 并行处理的数据类型,简而言之:
dask.bag = map, filter, toolz + 并行计算
相关文档
创建数据集
%run prep.py -d accounts
启动分布式调度器
from dask.distributed import Client
client = Client(n_workers=4)
Bag 创建
你可以从 Python 序列中、文件中、S3 文件中创建 Bag。并用 .take() 方法提取 Bag 中的块:
import dask.bag as db
# 元素是整数
b = db.from_sequence([1,2,3,4,5,6,7,8,9,10], npartitions=2)
b.take(3)
# 元素是一个 text 文件,其中文件的每一行都是一个 Json 对象
# 注意,自动解压的
import os
b = db.read_text(os.path.join('data', 'accounts.*.json.gz'))
b.take(1)
注意:数据被分割成块了,第一个例子中,序列被分为两块。第二个例子中,每一个文件作为一个块。
也可以获取远程数据,以下是从一个 s3fs 库中,读取远程的 CSV 文件并分块
import sources
sources.bag_url
# Requires `s3fs` library
# each partition is a remote CSV text file
b = db.read_text(sources.bag_url,
storage_options={'anon': True})
b.take(1)
操作
Bag 对象兼用许多 Pyhon 基本库,如toolz
或 pyspark
中的 API,包括map
, filter
, groupby
等。
对 Bag 对象的运算将返回新的 Bag 对象。并调用 .compute() 方法触发执行(见手册第一章)
def is_even(n):
return n % 2 == 0
b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
c = b.filter(is_even).map(lambda x: x ** 2)
c
c.compute()
示例:账户 JSON 数据
我们已经在 data 文件夹下创建了一个 虚拟的 JSON 数据集,这些数据根据同一个 id 将条目打包在一起,作为一条记录。这些数据类似于文档存储数据库,或从 Web API 上获取的数据。
每一条数据都是一个 JSON,并用带有如下 key 的字典编码:
- id: 用户的 id
- name: 用户名
- transactions: 转账记录,是一个包含
transaction-id
,amount
的 list 对(list pair)
filename = os.path.join('data', 'accounts.*.json.gz')
lines = db.read_text(filename)
lines.take(3)
可以看到,数据是一行行的文本,且在读取文件时,自动进行的解压操作。为了让数据更加直观,我们可以用调用 map 方法,并用 json.load 函数:
import json
js = lines.map(json.loads)
# take: inspect first few elements
js.take(3)
基本查询
Map、filter、pluck
一旦将 JSON 数据表达为 Python 对象(dict 和 list)时,我们就能用 Python 函数来做一些简单的查询。
js.filter(lambda record: record['name'] == 'Alice').take(5)
def count_transactions(d):
return {'name': d['name'], 'count': len(d['transactions'])}
# map: 非常类似于 Python 的BIF map()
(js.filter(lambda record: record['name'] == 'Alice')
.map(count_transactions)
.take(5))
# pluck: select a field, as from a dictionary, element[field]
# pluck 函数从多个字典中,选择相同的 key 对应的 value
(js.filter(lambda record: record['name'] == 'Alice')
.map(count_transactions)
.pluck('count')
.take(5))
# 计算 Alice 的平均交易量
(js.filter(lambda record: record['name'] == 'Alice')
.map(count_transactions)
.pluck('count')
.mean()
.compute())
flatten
用 .flattern() 方法,将嵌套数据用平均值解嵌套:
(js.filter(lambda record: record['name'] == 'Alice')
.pluck('transactions')
.take(3))
# 用 flatten()
(js.filter(lambda record: record['name'] == 'Alice')
.pluck('transactions')
.flatten()
.take(3))
(js.filter(lambda record: record['name'] == 'Alice')
.pluck('transactions')
.flatten()
.pluck('amount')
.mean()
.compute())
Groupby 和 Foldby
Groupby
通常我们需要用函数或键来对数据进行打包:为此,我们可以用 groupby 和 folder 实现。前者会对所有数据根据 group 打乱数据原有顺序,所以消耗资源较多。后者仅仅是顺序走一遍地同时记录结果,所以运行较快。
-
groupby
: 对数据进行重新排序,进而使那些属于相同 group 的项目能在一个 键-值 对中。 -
foldby
: 走一遍数据,同时收集结果
Note: groupby 的效率是非常低的,建议用 folder 或用 dataframe 的相关方法.
b = db.from_sequence(['Alice', 'Bob', 'Charlie', 'Dan', 'Edith', 'Frank'])
b.groupby(len).compute() # names grouped by length
b = db.from_sequence(list(range(10)))
b.groupby(lambda x: x % 2).compute()
b.groupby(lambda x: x % 2).starmap(lambda k, v: (k, max(v))).compute()
foldby
Foldby 对初学者而言可能会很奇葩,实际上它和如下函数是类似的:
其参数大致如下:
- 用于 group 的函数
- 用于对每一个 group 降维的二进制操作器
- 用于结合不同块,但具有相同的 group 的数据
我们可以比较一下 groupby 和 foldby:
%%time
# 注意,需要等一会
result = js.groupby(lambda item: item['name']).starmap(lambda k, v: (k, len(v))).compute()
print(sorted(result))
结果用了 27 s
%%time
from operator import add
def incr(tot, _):
return tot + 1
result = js.foldby(key='name',
binop=incr,
initial=0,
combine=add,
combine_initial=0).compute()
print(sorted(result))
仅用了 5.6 s。
DataFrame
就像 pandas.DataFrame 之于 Python 内置的数据结构一般,dask.dataframe 之于 dask.bag 也一样,都是更加统一、结构化且高效化。从 Bag 的角度来看,一旦数据可以转为 dataframe,那么那种复杂的 split-applly-combine 逻辑,就会更加直接和高效。
你可以将一个只包含元组、字典或列表的 bag,通过调用 .to_dataframe() 的方法,转换为一个 dask.dataframe:
df1 = js.to_dataframe()
df1.head()
可以看到,数据转换为一个 Dataframe。于是我们就可以像用 pandas 那样,对数据进行高效地处理了。
当使用 dask.dataframe.groupby() 时,会比 dask.bag.groupby() 少一个数量级的运行时间,但依旧比不上 dask.bag.foldby()
%time df1.groupby('name').id.count().compute().head()
运行时间为 8 s。
提高效率!
上述的 Dataframe 格式实际上还不是最优的。因为在 transaction 一列中,依旧是一个嵌套的字典。所以该列的数据类型为 object。而一般地,为了提高效率,我们通常要求数据类型为 int、float、string,因此,我们还需要进行进一步地处理:
def denormalize(record):
# returns a list for each person, one item per transaction
return [{'id': record['id'],
'name': record['name'],
'amount': transaction['amount'],
'transaction-id': transaction['transaction-id']}
for transaction in record['transactions']]
transactions = js.map(denormalize).flatten()
transactions.take(3)
df = transactions.to_dataframe()
df.head()
缺点
Bag 提供了许多通用的计算,并兼用许多 Python 写的函数,但方便的同时也有代价:
- Bag 的运算普遍慢于 array 和 dataframe;就像 Python 内置类型慢于 Numpy 和 Pandas。
- Bag.groupby 很慢,而 Bag.foldby 很伤脑筋。如果有可能的话,还是选择 dataframe 的 groupby 吧。