Dask 手册:02 Bag

文章目录

Bag:半结构数据的可并行化计算列表

Dask-bag 擅长处理那些,可以表示为无序序列的数据。一般我们将其称为:“混乱” 数据,因为它包括了负载的嵌入结构、缺失域、混合数据类型等。函数式 的编程风格非常适合标准的Python迭代,itertool 就是这方面很好的库。

在数据预处理阶段,特别是读取大量的原始数据时,常会出现“混乱”数据。数据集的原始格式可能是 Json、CSV、XML 或其他结构和数据类型没有严格限制的数据。为此,对于这些数据的表示、读取和处理,通常使用 Python 的列表、集合和字典。

Bag 在上述 Python 数据类型的基础上,对通用存储和处理进行了优化。通过迭代器/生成器表达式或 itertoolstoolz等库添加流计算,可以让我们在较小的空间中处理大量数据。

应该说,Dask.Bag 是一种集成了抽象数据结构 + 并行处理的数据类型,简而言之:

dask.bag = map, filter, toolz + 并行计算

相关文档

创建数据集

%run prep.py -d accounts

启动分布式调度器

from dask.distributed import Client

client = Client(n_workers=4)

Bag 创建

你可以从 Python 序列中、文件中、S3 文件中创建 Bag。并用 .take() 方法提取 Bag 中的块:

import dask.bag as db
# 元素是整数
b = db.from_sequence([1,2,3,4,5,6,7,8,9,10], npartitions=2)
b.take(3)
# 元素是一个 text 文件,其中文件的每一行都是一个 Json 对象
# 注意,自动解压的
import os
b = db.read_text(os.path.join('data', 'accounts.*.json.gz'))
b.take(1)

注意:数据被分割成块了,第一个例子中,序列被分为两块。第二个例子中,每一个文件作为一个块。

也可以获取远程数据,以下是从一个 s3fs 库中,读取远程的 CSV 文件并分块

import sources
sources.bag_url	
# Requires `s3fs` library
# each partition is a remote CSV text file
b = db.read_text(sources.bag_url,
                 storage_options={'anon': True})
b.take(1)

操作

Bag 对象兼用许多 Pyhon 基本库,如toolzpyspark中的 API,包括map, filter, groupby 等。

对 Bag 对象的运算将返回新的 Bag 对象。并调用 .compute() 方法触发执行(见手册第一章)

def is_even(n):
    return n % 2 == 0

b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
c = b.filter(is_even).map(lambda x: x ** 2)
c
c.compute()

示例:账户 JSON 数据

我们已经在 data 文件夹下创建了一个 虚拟的 JSON 数据集,这些数据根据同一个 id 将条目打包在一起,作为一条记录。这些数据类似于文档存储数据库,或从 Web API 上获取的数据。

每一条数据都是一个 JSON,并用带有如下 key 的字典编码:

  • id: 用户的 id
  • name: 用户名
  • transactions: 转账记录,是一个包含 transaction-id, amount 的 list 对(list pair)
filename = os.path.join('data', 'accounts.*.json.gz')
lines = db.read_text(filename)
lines.take(3)

可以看到,数据是一行行的文本,且在读取文件时,自动进行的解压操作。为了让数据更加直观,我们可以用调用 map 方法,并用 json.load 函数:

import json
js = lines.map(json.loads)
# take: inspect first few elements
js.take(3)

基本查询

Map、filter、pluck

一旦将 JSON 数据表达为 Python 对象(dict 和 list)时,我们就能用 Python 函数来做一些简单的查询。

js.filter(lambda record: record['name'] == 'Alice').take(5)

def count_transactions(d):
    return {'name': d['name'], 'count': len(d['transactions'])}

# map: 非常类似于 Python 的BIF map()
(js.filter(lambda record: record['name'] == 'Alice')
   .map(count_transactions)
   .take(5))

# pluck: select a field, as from a dictionary, element[field]
# pluck 函数从多个字典中,选择相同的 key 对应的 value
(js.filter(lambda record: record['name'] == 'Alice')
   .map(count_transactions)
   .pluck('count')
   .take(5))

# 计算 Alice 的平均交易量
(js.filter(lambda record: record['name'] == 'Alice')
   .map(count_transactions)
   .pluck('count')
   .mean()
   .compute())

flatten

用 .flattern() 方法,将嵌套数据用平均值解嵌套:

(js.filter(lambda record: record['name'] == 'Alice')
   .pluck('transactions')
   .take(3))
# 用 flatten()
(js.filter(lambda record: record['name'] == 'Alice')
   .pluck('transactions')
   .flatten()
   .take(3))
   
(js.filter(lambda record: record['name'] == 'Alice')
   .pluck('transactions')
   .flatten()
   .pluck('amount')
   .mean()
   .compute())

Groupby 和 Foldby

Groupby

通常我们需要用函数或键来对数据进行打包:为此,我们可以用 groupby 和 folder 实现。前者会对所有数据根据 group 打乱数据原有顺序,所以消耗资源较多。后者仅仅是顺序走一遍地同时记录结果,所以运行较快。

  • groupby: 对数据进行重新排序,进而使那些属于相同 group 的项目能在一个 键-值 对中。
  • foldby: 走一遍数据,同时收集结果

Note: groupby 的效率是非常低的,建议用 folder 或用 dataframe 的相关方法.

b = db.from_sequence(['Alice', 'Bob', 'Charlie', 'Dan', 'Edith', 'Frank'])
b.groupby(len).compute()  # names grouped by length

b = db.from_sequence(list(range(10)))
b.groupby(lambda x: x % 2).compute()

b.groupby(lambda x: x % 2).starmap(lambda k, v: (k, max(v))).compute()

foldby

Foldby 对初学者而言可能会很奇葩,实际上它和如下函数是类似的:

其参数大致如下:

  1. 用于 group 的函数
  2. 用于对每一个 group 降维的二进制操作器
  3. 用于结合不同块,但具有相同的 group 的数据

我们可以比较一下 groupby 和 foldby:

%%time
# 注意,需要等一会
result = js.groupby(lambda item: item['name']).starmap(lambda k, v: (k, len(v))).compute()
print(sorted(result))

结果用了 27 s

%%time
from operator import add
def incr(tot, _):
    return tot + 1

result = js.foldby(key='name', 
                   binop=incr, 
                   initial=0, 
                   combine=add, 
                   combine_initial=0).compute()
print(sorted(result))

仅用了 5.6 s。

DataFrame

就像 pandas.DataFrame 之于 Python 内置的数据结构一般,dask.dataframe 之于 dask.bag 也一样,都是更加统一、结构化且高效化。从 Bag 的角度来看,一旦数据可以转为 dataframe,那么那种复杂的 split-applly-combine 逻辑,就会更加直接和高效。

你可以将一个只包含元组、字典或列表的 bag,通过调用 .to_dataframe() 的方法,转换为一个 dask.dataframe:

df1 = js.to_dataframe()
df1.head()

可以看到,数据转换为一个 Dataframe。于是我们就可以像用 pandas 那样,对数据进行高效地处理了。

当使用 dask.dataframe.groupby() 时,会比 dask.bag.groupby() 少一个数量级的运行时间,但依旧比不上 dask.bag.foldby()

%time df1.groupby('name').id.count().compute().head()

运行时间为 8 s。

提高效率!

上述的 Dataframe 格式实际上还不是最优的。因为在 transaction 一列中,依旧是一个嵌套的字典。所以该列的数据类型为 object。而一般地,为了提高效率,我们通常要求数据类型为 int、float、string,因此,我们还需要进行进一步地处理:

def denormalize(record):
    # returns a list for each person, one item per transaction
    return [{'id': record['id'], 
             'name': record['name'], 
             'amount': transaction['amount'], 
             'transaction-id': transaction['transaction-id']}
            for transaction in record['transactions']]

transactions = js.map(denormalize).flatten()
transactions.take(3)
df = transactions.to_dataframe()
df.head()

缺点

Bag 提供了许多通用的计算,并兼用许多 Python 写的函数,但方便的同时也有代价:

  1. Bag 的运算普遍慢于 array 和 dataframe;就像 Python 内置类型慢于 Numpy 和 Pandas。
  2. Bag.groupby 很慢,而 Bag.foldby 很伤脑筋。如果有可能的话,还是选择 dataframe 的 groupby 吧。

学习资料

上一篇:如何更改 C# Record 构造函数的行为


下一篇:最长递增子序列(动态规划)