《使用Python和Dask实现分布式并行计算》5. Cleaning and transforming DataFrames(清洗和转换DataFrame)

楔子

对于任何数据科学项目而言,数据清理都是非常重要的一个环节,因为数据中的异常值会对统计分析产生负面的影响,从而导致我们得出错误的结论,最终可能建立起无法成立的机器学习模型。因此在数据的探索性分析之前,尽可能地清洗数据是很有必要。

在我们清洗数据时,你还会了解到Dask提供的许多操作DataFrame的方法,当然这些方法和pandas的DataFrame是非常类似的,可以说几乎没什么区别,因为Dask DataFrame就是由多个pandas DataFrame组成的。所以在介绍的时候,我们不会说的那么详细,其实如果你会pandas的话,这一章你几乎可以一目十行的浏览。不过有些操作虽然看起来相同,但由于Dask的分布式特性,我们还会看到这些相同的操作会有一些不同的表现、以及Dask如何处理这些差异。

这里还是使用之前的nyc停车罚单数据,当然你也可以使用其它的数据,不过最好还是保持一致。

import dask.dataframe as dd
from dask.diagnostics import ProgressBar
import numpy as np
# 以上三个模块提前导入一下, 这里我们的代码都在jupyter notebook上运行

dtypes = {  # 共同的列以及合适的类型
    'Date First Observed': np.str,
    'Days Parking In Effect    ': np.str,
    'Double Parking Violation': np.str,
    'Feet From Curb': np.float32,
    'From Hours In Effect': np.str,
    'House Number': np.str,
    'Hydrant Violation': np.str,
    'Intersecting Street': np.str,
    'Issue Date': np.str,
    'Issuer Code': np.float32,
    'Issuer Command': np.str,
    'Issuer Precinct': np.float32,
    'Issuer Squad': np.str,
    'Issuing Agency': np.str,
    'Law Section': np.float32,
    'Meter Number': np.str,
    'No Standing or Stopping Violation': np.str,
    'Plate ID': np.str,
    'Plate Type': np.str,
    'Registration State': np.str,
    'Street Code1': np.uint32,
    'Street Code2': np.uint32,
    'Street Code3': np.uint32,
    'Street Name': np.str,
    'Sub Division': np.str,
    'Summons Number': np.uint32,
    'Time First Observed': np.str,
    'To Hours In Effect': np.str,
    'Unregistered Vehicle?': np.str,
    'Vehicle Body Type': np.str,
    'Vehicle Color': np.str,
    'Vehicle Expiration Date': np.str,
    'Vehicle Make': np.str,
    'Vehicle Year': np.float32,
    'Violation Code': np.uint16,
    'Violation County': np.str,
    'Violation Description': np.str,
    'Violation In Front Of Or Opposite': np.str,
    'Violation Legal Code': np.str,
    'Violation Location': np.str,
    'Violation Post Code': np.str,
    'Violation Precinct': np.float32,
    'Violation Time': np.str
}

# 这里的read_csv除了读取csv文件之外, 还可以读取txt文件
# 只要是分隔符文本文件, read_csv都是可以读取的, 只是需要注意分隔符
nyc_data_raw = dd.read_csv(r"C:\Users\satori\Desktop\nyc\*.csv", dtype=dtypes, usecols=dtypes.keys())
print(nyc_data_raw.npartitions)  # 142

注意:这一章内容非常简单,都是一些pandas DataFrame的一些基础操作,因为书中是默认你没有pandas基础的。但我这里则是认为你有pandas基础的,所以一些东西我介绍的就不那么详细了,比如:drop、rename等等,直接就一笔带过了。事实上,你也会认为我的决定是正确的。

使用索引和轴

之前我们了解到Dask DataFrame有三个结构元素:一个索引和两个轴,那么下面我们就来看看。

从DataFrame中选择指定的列

到目前为止,我们除了为每个列选择了适当的数据类型并读入到Dask DataFrame之外,我们还没有对nyc停车罚单数据进行过多的处理。那么下面就来学习如何使用DataFrame的索引和轴,熟悉了它们,我们就可以轻松地开始对数据的探索。

nyc_data_raw["Plate ID"].head()
"""
0    GBB9093
1    62416MB
2    78755JZ
3    63009MA
4    91648MC
Name: Plate ID, dtype: object
"""

如果我们想筛选某一列,那么直接通过字典的方式获取即可。

nyc_data_raw[["Plate ID", "Registration State"]].head()

《使用Python和Dask实现分布式并行计算》5. Cleaning and transforming DataFrames(清洗和转换DataFrame)

筛选多个列的话,那么通过一个列表传递即可,筛选单个列得到的是Series,筛选多个列得到的是DataFrame。如果想筛选单个列也得到DataFrame的话,那么也通过列表的方式传递即可,比如:nyc_data_raw[["Plate ID"]]

另外,如果是pandas DataFrame的话,那么还有一个filter方法,也是用来筛选指定列的。

def filter(
    self: FrameOrSeries,
    items=None,
    like: Optional[str] = None,
    regex: Optional[str] = None,
    axis=None,
) -> FrameOrSeries:

支持的筛选方式也很多,但是Dask DataFrame中没有这个方法,切记。

丢弃DataFrame的列

如果我们不想要某些列的话,我们可以手动输入要筛选的列,但是如果列非常多的话将会是一个可怕的数据量。而Dask DataFrame提供了drop方法,这个和pandas中的drop是一样的。

len(nyc_data_raw.columns), len(nyc_data_raw.drop(["Violation Code"], axis=1).columns)  # (43, 42)

这里需要指定axis=1,因为我们删除的是列,删除列的话显然是1轴的维度会发生变化,所以要指定axis=1表示对1轴进行操作。或者我个人更喜欢这么做:

len(nyc_data_raw.columns), len(nyc_data_raw.drop(columns=["Violation Code"]).columns)  # (43, 42)

通过关键字参数columns指定,这样就知道你删除的是列了,同理删除行是index。

另外,如果删除一个不存在的列的话会报错,这个时候可以通过errors参数来避免这一点。

nyc_data_raw.drop(columns=["xxx"], errors="ignore")  
# errors默认是"raise", 列不存在的话会报错
# 指定为"ignore"的话, 则会忽略这一点

另外,如果删除一个不存在的列的话会报错,这个时候可以通过errors参数来避免这一点。

关于筛选想要的列和删除不想要的列,这两者是等价的,关键是看在输入方面哪个更方便。

对DataFrame中的列重命名

如果你发现名字不合适,那么你可以对列进行重命名。

print("Plate Type" in nyc_data_raw.columns)  # True
print("Plate Type" in nyc_data_raw.rename(columns={"Plate Type": "Plate Type1"}))  # False

比较简单,这里不废话了。

从DataFrame中选择指定的行

书中介绍的是loc和iloc,我们演示一下吧。

nyc_data_raw.loc[100: 300, ["Plate Type", "Violation Code"]].head()

《使用Python和Dask实现分布式并行计算》5. Cleaning and transforming DataFrames(清洗和转换DataFrame)

注意:loc中筛选索引的时候支持切片操作(还有标量),如果是筛选指定的多个索引是不行的。比如:nyc_data_raw.loc[100: 102]可以,nyc_data_raw.loc[100]也可以,但是nyc_data_raw.loc[[100, 101, 102]]不行。

nyc_data_raw.iloc[:, [0, 33]].head()

《使用Python和Dask实现分布式并行计算》5. Cleaning and transforming DataFrames(清洗和转换DataFrame)

如果是iloc的话,那么只能选择列,也就是说使用iloc的话必须是df.iloc[:, column_indexer]形式。

处理缺失值

通常情况下,数据会含有缺失值,这个时候我们需要先对缺失值进行处理,而处理的办法有多种方式:

  • 删除数据中含有缺失值的行或者列
  • 给缺失值一个默认值
  • 推算缺失值

计算DataFrame中的缺失值

我们先来看看nyc停车罚单数据中有哪些列有缺失值:

# 默认计算每一列的缺失值数量,如果想计算每一行的缺失值数量的话, 只需在sum函数中传递axis=1即可
nyc_data_raw.isnull().sum().compute()
"""
Summons Number                              0
Plate ID                                 8835
Registration State                          0
Plate Type                                  0
Issue Date                                  0
Violation Code                              0
Vehicle Body Type                      239185
Vehicle Make                           275429
Issuing Agency                              0
Street Code1                                0
Street Code2                                0
Street Code3                                0
Vehicle Expiration Date                     1
Violation Location                    6411396
Violation Precinct                          1
Issuer Precinct                             1
Issuer Code                                 1
Issuer Command                        6358897
Issuer Squad                          6360470
Violation Time                           8132
Time First Observed                  38122805
Violation County                      4299524
Violation In Front Of Or Opposite     6754530
House Number                          7169114
Street Name                             23242
Intersecting Street                  30726552
Date First Observed                         3
Law Section                                 3
Sub Division                             5255
Violation Legal Code                 35975989
Days Parking In Effect                9833514
From Hours In Effect                 18976964
To Hours In Effect                   18976961
Vehicle Color                          487877
Unregistered Vehicle?                37463680
Vehicle Year                                5
Meter Number                         34344009
Feet From Curb                              5
Violation Post Code                  11233648
Violation Description                 4878815
No Standing or Stopping Violation    42339437
Hydrant Violation                    42339437
Double Parking Violation             42339437
dtype: int64
"""

删除含有缺失值的列

现在我们可以处理了,可以将含有缺失值的列删掉,当然我们这里是删除缺失值超过总量百分之50列,不然的话整个DataFrame就没有多少字段了。

# 默认计算每一列的缺失值数量,如果想计算每一行的缺失值数量的话, 只需在sum函数中传递axis=1即可
missing_percent = (nyc_data_raw.isnull().sum() / nyc_data_raw.index.size) * 100
nyc_data_clean_stage1 = nyc_data_raw.drop(columns=list(missing_percent[missing_percent >= 50].index))
nyc_data_raw.columns.difference(nyc_data_clean_stage1.columns)
"""
Index(['Double Parking Violation', 'Hydrant Violation', 'Intersecting Street',
       'Meter Number', 'No Standing or Stopping Violation',
       'Time First Observed', 'Unregistered Vehicle?', 'Violation Legal Code'],
      dtype='object')
"""

以上我们便删除了一部分列。

推测缺失值

如果列只包含很少的缺失值的话,那么我们不应该丢弃整个列,而是应该丢弃行。不过在此之前,我们可以对缺失值进行一个猜测,假设一个列中大量都是重复的,在去重之后只有三个不同的值,那么我们可以选择出现次数最多的值进行填充。尽管这个结论不总是正确的,但是可以最大程度地提高我们正确选择的概率。我们以"Vehicle Color"这一列为例:

# 计算该列中出现的值和对应数量
count_of_vehicle_colors = nyc_data_clean_stage1["Vehicle Color"].value_counts().compute()
print(count_of_vehicle_colors)
"""
GY       6280314
WH       6074770
WHITE    5624960
BK       5121030
BLACK    2758479
          ...   
MAU            1
MATOO          1
MATH           1
MARY           1
$RY            1
Name: Vehicle Color, Length: 5744, dtype: int64
"""
# 获取出现次数最多的元素
most_common_color = count_of_vehicle_colors.iloc[0]
# 填充,得到新的DataFrame,fillna如果接收一个标量是对所有的列进行填充,也可以传入一个字典对指定的列进行填充
nyc_data_clean_stage2 = nyc_data_clean_stage1.fillna({"Vehicle Color": most_common_color})

删除含有缺失值的行

现在我们已经对"Vehicle Color"这一列填充了缺失值,因为这一列是与颜色有关的,所以它肯定大部分都是重复的,因此我们选择了这种做法。还有一些有缺失值、但是比较少的列,我们可以直接删除它们的行。

# 找出缺失值在百分之0到百分之5之间的列
rows_to_drop = list(missing_percent[(missing_percent > 0) & (missing_percent < 5)].index)
# 删除它们的缺失值, 使用dropna函数, 该函数和pandas中DataFrame的dropna用法一样
# 里面可以指定一个how参数:默认为"any", 只要有一个列为缺失值、整行就删掉;为"all"的话表示所有列都是缺失值、改行才删掉
# 除此之外, 也可以指定axis, 默认为0、基于0轴操作。如果指定为1, 那么一旦有缺失值整个列就会被删掉
nyc_data_clean_stage3 = nyc_data_clean_stage2.dropna(subset=rows_to_drop)

用缺失值填充多个列

我们目前已经做完大部分的事了,最后就是用默认值填充缺少数据的其余列。但是有一点很重要,我们为列设置的默认值要符合该列的数据类型,我们先来看看还剩下哪些列。

# 找到缺失值在百分之5到百分之50的
remaining_columns_to_clean = list(missing_percent[(missing_percent >= 5) & (missing_percent < 50)].index)
# 获取它们的数据类型
nyc_data_raw.dtypes[remaining_columns_to_clean]
"""
Violation Location                   object
Issuer Command                       object
Issuer Squad                         object
Violation County                     object
Violation In Front Of Or Opposite    object
House Number                         object
Days Parking In Effect               object
From Hours In Effect                 object
To Hours In Effect                   object
Violation Post Code                  object
Violation Description                object
dtype: object
"""

我们剩下的列都是字符串类型,但是现实的都是object类型,你可能好奇为什么?答案是这只是一个装饰,Dask仅显示数值(int,float)等数据类型,任何非数值的类型都将显示为对象。

# 我们将使用字符串"Unknow"来填充剩余的列
unknown_default_dict = dict(map(lambda x: (x, "Unknow"), remaining_columns_to_clean))
nyc_data_clean_stage4 = nyc_data_clean_stage3.fillna(unknown_default_dict)

# 看看最后还没有缺失值
nyc_data_clean_stage4.isnull().sum().compute()
"""
Summons Number                       0
Plate ID                             0
Registration State                   0
Plate Type                           0
Issue Date                           0
Violation Code                       0
Vehicle Body Type                    0
Vehicle Make                         0
Issuing Agency                       0
Street Code1                         0
Street Code2                         0
Street Code3                         0
Vehicle Expiration Date              0
Violation Location                   0
Violation Precinct                   0
Issuer Precinct                      0
Issuer Code                          0
Issuer Command                       0
Issuer Squad                         0
Violation Time                       0
Violation County                     0
Violation In Front Of Or Opposite    0
House Number                         0
Street Name                          0
Date First Observed                  0
Law Section                          0
Sub Division                         0
Days Parking In Effect               0
From Hours In Effect                 0
To Hours In Effect                   0
Vehicle Color                        0
Vehicle Year                         0
Feet From Curb                       0
Violation Post Code                  0
Violation Description                0
dtype: int64
"""

此时我们就已经处理完了所有的缺失值,显然nyc_data_clean_stage4是我们后续需要使用DataFrame,那么我们是不是应该要将其进行持久化呢?答案是显然的。

nyc_final_data = nyc_data_clean_stage4.persist()

记录数据

和缺失值一样,数据中的一些实例虽然没有丢失,但是其有效性让人怀疑,这种情况也很常见。我们需要一种办法来清理这些异常的数据,至于这里面的异常数据是什么我们就不说了,直接看书上是怎么做的吧。

nyc_data_clean_stage4['Plate Type'].value_counts().compute()
"""
PAS    30452502
COM     7966914
OMT     1389341
SRF      394656
OMS      368952
         ...   
HOU           4
JWV           3
LOC           3
HIF           2
SNO           2
Name: Plate Type, Length: 90, dtype: int64
"""

我们看到大部分的值都是"PAS"和"COM",至于其它的值我们可以使用字符串"Other"来代替。

# 找到"Plate Type"的值在['PAS', 'COM']里面的记录
condition = nyc_final_data['Plate Type'].isin(['PAS', 'COM'])
# where:接收一个condition和value,将不满足condition的值替换成value
plate_type_masked = nyc_final_data['Plate Type'].where(condition, 'Other')
# 删除'Plate Type'这一列
nyc_data_recode_stage1 = nyc_data_clean_stage4.drop('Plate Type', axis=1)
# 我们看到pandas DataFrame中的assign在这里是支持的
nyc_data_recode_stage2 = nyc_data_recode_stage1.assign(PlateType=plate_type_masked)
# 不过原来的名字里面包含了一个空格, 这里再使用rename替换一下
nyc_data_recode_stage3 = nyc_data_recode_stage2.rename(columns={'PlateType':'Plate Type'})

然后我们再来看看里面的值:

nyc_data_recode_stage3['Plate Type'].value_counts().compute()
"""
PAS      30452502
COM       7966914
Other     3418586
Name: Plate Type, dtype: int64
"""

Nice,然后我们再来检查一下"Vehicle Color"这一列。

nyc_data_recode_stage3['Vehicle Color'].value_counts().compute()
"""
GY       6247250
WH       6027292
WHITE    5550821
BK       5096575
BLACK    2731591
          ...   
REK            1
GJR            1
GJOLD          1
GJA            1
JCCY           1
Name: Vehicle Color, Length: 5335, dtype: int64
"""

这个数据集包含超过5335种独特的颜色,但显然这是非常奇怪的,而有大部分颜色都只出现了一次,所以我们将只出现一次的颜色替换为"Other"。

# 找出只出现了一次的颜色
single_color = list(count_of_vehicle_colors[count_of_vehicle_colors == 1].index)
# 获取出现在里面的颜色
condition = nyc_data_clean_stage4['Vehicle Color'].isin(single_color)
# 注意:where是不满足condition的值才会被替换为"Other", 所以我们应该对condition取反, 或者将where换成mask
vehicle_color_masked = nyc_data_clean_stage4['Vehicle Color'].mask(condition, 'Other')
# 删除'Vehicle Color'这一列
nyc_data_recode_stage4 = nyc_data_recode_stage3.drop('Vehicle Color', axis=1)
# 将vehicle_color_masked这一列加进去,然后rename
nyc_data_recode_stage5 = nyc_data_recode_stage4.assign(VehicleColor=vehicle_color_masked)
nyc_data_recode_stage6 = nyc_data_recode_stage5.rename(columns={'VehicleColor':'Vehicle Color'})

Elementwise操作

下面了解一下DataFrame的apply操作。

from datetime import datetime
# apply也算是pandas中一个很常见的操作了,注意:该方法效率不高
# 但是注意里面meta参数,这是和pandas DataFrame中一个不同的地方,因为Dask会推断输出类型,但是根据我们之前说的。
# 最好不要让Dask来猜,而是我们显式地指定。
issue_date_parsed = nyc_data_recode_stage6['Issue Date'].apply(lambda x: datetime.strptime(x, "%m/%d/%Y"), meta=datetime)
# 删除Issue Date这一列
nyc_data_derived_stage1 = nyc_data_recode_stage6.drop('Issue Date', axis=1)
# 增加新列
nyc_data_derived_stage2 = nyc_data_derived_stage1.assign(IssueDate=issue_date_parsed)
# 最后rename
nyc_data_derived_stage3 = nyc_data_derived_stage2.rename(columns={'IssueDate':'Issue Date'}

下面我们我们来看一下结果:

nyc_data_derived_stage3['Issue Date'].head()
"""
0   2013-08-04
1   2013-08-04
2   2013-08-05
3   2013-08-05
4   2013-08-08
Name: Issue Date, dtype: datetime64[ns]
"""

此时这一列就不再是我们原来的字符串类型了,而是我们希望的时间类型,那么下面我们再根据这一列得到年月吧。

issue_date_month_year = nyc_data_derived_stage3['Issue Date'].apply(lambda dt: f"{dt: %Y%m}", meta=int)
nyc_data_derived_stage4 = nyc_data_derived_stage3.assign(IssueMonthYear=issue_date_month_year)
nyc_data_derived_stage5 = nyc_data_derived_stage4.rename(columns={'IssueMonthYear':'Citation Issued Month Year'})

# 查看一下结果
nyc_data_derived_stage5['Citation Issued Month Year'].head()
"""
0     201308
1     201308
2     201308
3     201308
4     201308
Name: Citation Issued Month Year, dtype: object
"""

完美,这正是我们想要的结果。不过使用pandas的你肯定发现了,整个流程虽然没有什么不妥,但是不是有点太麻烦了。直接通过df["col"] = df["col"].apply(lambda x: ...)这种方式不行吗?为什么非要先删除、再添加,然后再rename呢?因为书上就是这么写的,当然我们使用这种方式也是可以的,而且是最正确的选择。

我们之前说了,Dask DataFrame不支持insert,但是并不代表它就不支持本地修改,我们完全可以通过上面说的这种向量化赋值的操作实现。我们举个栗子:

print("Citation Issued Month Year" in nyc_data_derived_stage5.columns)  # True
nyc_data_derived_stage5.pop("Citation Issued Month Year")
print("Citation Issued Month Year" in nyc_data_derived_stage5.columns)  # False
nyc_data_derived_stage5["Citation Issued Month Year"] = issue_date_month_year
print("Citation Issued Month Year" in nyc_data_derived_stage5.columns)  # True

对DataFrame进行过滤和重索引

比较简单,主要是介绍&和|操作符将两个条件连接起来,没什么好说的。

对DataFrame进行join和合并

书中这里介绍的是join,但是说实话我个人不建议使用join,而是使用merge。这里我们使用简单的数据集进行模拟吧。先来看看合并

import dask.dataframe as dd
import pandas as pd

df1 = pd.DataFrame({"id": [1, 2, 3], "name": ["夏色祭", "白上吹雪", "神乐mea"]})
df2 = pd.DataFrame({"id": [4, 5], "name": ["碧居结衣", "神乐七奈"]})

dask_df1 = dd.from_pandas(df1, npartitions=1)
dask_df2 = dd.from_pandas(df2, npartitions=2)

print(dask_df1.append(df2).compute())
"""
   id   name
0   1    夏色祭
1   2   白上吹雪
2   3  神乐mea
0   4   碧居结衣
1   5   神乐七奈
"""
# 或者使用dd.concat
print(dd.concat([dask_df1, dask_df2]).compute())
"""
   id   name
0   1    夏色祭
1   2   白上吹雪
2   3  神乐mea
0   4   碧居结衣
1   5   神乐七奈
"""
# 如果是左右合并的话
print(dd.concat([dask_df1, dask_df2], axis=1).compute())
"""
   id   name      id    name
0   1   夏色祭    4.0   碧居结衣
1   2   白上吹雪   5.0   神乐七奈
2   3  神乐mea    NaN   NaN
"""
# 结果和我们想的也是一样的,只不过我们一般不会通过concat进行左右合并

再来看看merge:

import dask.dataframe as dd
import pandas as pd

df1 = pd.DataFrame({"id": [1, 2, 3], "name": ["夏色祭", "白上吹雪", "神乐mea"]})
df2 = pd.DataFrame({"id": [1, 3, 2], "age": [18, 38, 22]})

dask_df1 = dd.from_pandas(df1, npartitions=1)
dask_df2 = dd.from_pandas(df2, npartitions=2)

print(dd.merge(dask_df1, dask_df2, how="inner", on="id").compute())
"""
   id   name  age
0   1    夏色祭   18
1   2   白上吹雪   22
2   3  神乐mea   38
"""

一些高级操作

下面来介绍一下我个人在pandas中经常使用的一些操作吧,虽然也算不上高级,但是却很好用。书上说,很多操作Dask都不支持,但是我发现很多还是可以用的,可能这本书在写的时候Dask还没有提供这些方法,而在更新的时候加进去了。

combine_first

这是一个挺好用的方法,专门用来填充缺失值的。

import dask.dataframe as dd
import pandas as pd

df = pd.DataFrame({"old_id": ["1", "2", "3"], "new_id": ["1001", None, "1003"]})

dask_df = dd.from_pandas(df, npartitions=1)
dask_df["new_id"] = dask_df["new_id"].combine_first(dask_df["old_id"])
print(dask_df.compute())
"""
  old_id new_id
0      1   1001
1      2      2
2      3   1003
"""

看出它的用法了吗?df["a"].combine_first(df["b"]),如果"a"这一列不为空那么保留对应的值,否则使用"b"列中对应的值替换。

combine

它的功能比combine_first要更加丰富一些,比如:

import dask.dataframe as dd
import pandas as pd

df = pd.DataFrame({"a": [11, 2, 33], "b": [1, 22, 33]})

dask_df = dd.from_pandas(df, npartitions=1)
dask_df["c"] = dask_df["a"].combine(
    dask_df["b"],
    lambda x, y: x if x > y else y
)
print(dask_df.compute())
"""
    a   b   c
0  11   1  11
1   2  22  22
2  33  33  33
"""

df["a"].combine(df["b"], lambda x, y: ...),会将a列和b列中的每一个值都传递给x、y,执行逻辑得到返回值。所以我们上面combine_first完全可以使用combine代替,y if dd.isna(x) else x

当然还有to_datetime,这个没什么好说的,重点是里面的meta参数。此外Series对象下的str方法,在Dask中也是可以使用的。

将数据写入到文本文件中

我们处理完数据之后,是不是还要将数据写入到文件中呢?

nyc_data_with_temps.repartition(npartitions=1).to_csv('nyc-final-csv/part*.csv')

我们处理完数据之后,是不是还要将数据写入到文件中呢?但是注意:我们在文件名中指定了一个*号,它会被Dask指定为文件的分区号。由于我们将所有分区都合并为一个分区,所以它将输出一个part0.csv文件,但Dask是一个分布式第三方库,将数据分割成多个文件更有意义,这些文件可以被并行读取。

总结

  • 选择列可以使用方括号的形式,里面指定想要筛选的列。
  • head方法默认显示DataFrame的前10条数据,当然也可以在里面指定要显示的行数。
  • 可以使用drop方法从DataFrame中删除列,但是会得到一个新的DataFrame,原来的DataFrame不受影响。不过pandas DataFrame中drop默认情况下也是返回一个新的DataFrame,但是可以通过inplace参数本地修改,而Dask DataFrame不支持这个参数。
  • 可以通过dropna方法将缺失值从DataFrame中删除。
  • 可以使用drop-assign-rename模式替换DataFrame中的列,但是也可以通过向量化的方式直接替换。
  • 可以使用apply方法对一个列执行元素替换。
  • 支持使用>、<、==过滤数据,如果需要多个条件,可以使用numpy风格的布尔函数。
  • 两个DataFrame可以合并成一个DataFrame,通过append、concat。
  • 可以使用merge对两个数据集进行join。
上一篇:STM32F103C8软件仿真 报错 *** error 65: access violation at 0x40011800 : no 'read' permission


下一篇:iOS 未申请权限引起crash