使用Apache Arrow助力PySpark数据处理

Apache Arrow从Spark 2.3版本开始被引入,通过列式存储,zero copy等技术,JVM 与Python 之间的数据传输效率得到了大量的提升。本文主要介绍一下Apache Arrow以及Spark中的使用方法。

列式存储简介

在介绍Spark中使用Apache Arrow之前,先简单的介绍一下Apache Arrow以及他背后的一些技术背景。
在大数据时代之前,大部分的存储引擎使用的是按行存储的形式,很多早期的系统,如交易系统、ERP系统等每次处理的是增、删、改、查某一个实体的所有信息,按行存储的话能够快速的定位到单个实体并进行处理。如果使用列存储,对某一个实体的不同属性的操作就需要进行多次随机读写,效率将会是非常差的。
随着大数据时代的到来,尤其是数据分析的不断发展,任务不需要一次读取实体的所有属性,而只关心特定的某些属性,并对这些属性进行aggregate等复杂的操作等。这种情况下行存储将需要读取额外的数据,形成瓶颈。而选择列存储将会减少额外数据的读取,对相同属性的数据还可以进行压缩,大大的加快了处理速度。
以下是行存储和列存储的对比说明,摘自Apache Arrow 官网,上面是一个二维表,由三个属性组成,分别是session_id, timestamp和source_ip。左侧为行存储在内存中的表示,数据按行依次存储,每一行按照列的顺序存储。右侧为列存储在内存中的表示,每一列单独存放,根据batch size等属性来控制一次写入的列簇大小。这样当查询语句只涉及少数列的时候,比如图中SQL查询,只需要过滤session_id列,避免读取所有数据列,减少了大量的I/O损耗,同时考虑到CPU pipeline以及使用CPU SIMD技术等等,将大大的提升查询速度。
使用Apache Arrow助力PySpark数据处理

Apache Arrow

在大数据领域,列式存储的灵感来自Google于2010年发表的Dremel论文,文中介绍了一种支持嵌套结构的存储格式,并且使用了列式存储的方式提升查询性能,在Dremel论文中还介绍了Google如何使用这种存储格式实现并行查询的。这篇论文影响了Hadoop生态系统发展,之后的Apache Parquet和Apache ORC作为列式存储格式已经被广大的Hadoop生态系统使用,如Spark、Hive、Impala等等。
Apache Arrow在官网上是这样定义的,Apache Arrow是一个跨语言、跨平台的内存数据结构。从这个定义中我们可以看到Apache Arrow与Apache Parquet以及Apache ORC的区别。Parquet与ORC设计的目的针对磁盘数据,在列存储的基础上使用了高效率的压缩算法进行压缩,比如使用snappy、gzip和zlib等算法对列数据进行压缩。所以大部分情况下在数据读取的时候需要首先对数据进行反压缩,并有一定的cpu使用损耗。而Arrow,作为在内存中的数据,并不支持压缩(当然写入磁盘是支持压缩的),Arrow使用dictionary-encoded来进行类似索引的操作。
除了列存储外,Arrow在数据在跨语言的数据传输上具有相当大的威力,Arrow的跨语言特性表示在Arrow的规范中,作者指定了不同数据类型的layout,包括不同原始数据类型在内存中占的比特数,Array数据的组成以及Null值的表示等等。根据这些定义后,在不同的平台和不同的语言中使用Arrow将会采用完全相同的内存结构,因此在不同平台间和不同语言间进行高效数据传输成为了可能。在Arrow之前如果要对不同语言数据进行传输必须要使用序列化与反序列化技术来完成,耗费了大量的CPU资源和时间,而Arrow由于根据规范在内存中的数据结构一致,可以通过共享内存, 内存映射文件等技术来共享Arrow内存结构,省去了序列化与反序列过程。

Spark与Apache Arrow

介绍完Arrow的背景后,来看一下Apache Spark如何使用Arrow来加速PySpark处理的。一直以来,使用PySpark的客户都在抱怨python的效率太低,导致了很多用户转向了使用Scala进行开发。这主要是由于Spark使用Scala语言开发,底层启动的是JVM,而PySpark是Scala中PythonRDD对象启动的一个Python子进程,Python与JVM的通信使用了Py4J, 通过Py4J Python程序能够动态的访问JVM中的Java对象,这一过程使用了linux pipe,在底层JVM需要对RDD进行序列化,在Python端需要对RDD进行反序列化,当数据量较大的时候效率远不如直接使用Scala。流程如下图。
使用Apache Arrow助力PySpark数据处理
很多数据科学家以及分析人员习惯使用python来进行处理,尤其是使用Pandas和Numpy库来对数据进行后续处理,Spark 2.3以后引入的Arrow将会大大的提升这一效率。我们从代码角度来看一下实现,在Spark 2.4版本的dataframe.py代码中,toPandas的实现为:

          if use_arrow:
                try:
                    from pyspark.sql.types import _check_dataframe_convert_date, \
                        _check_dataframe_localize_timestamps
                    import pyarrow
                    batches = self._collectAsArrow()
                    if len(batches) > 0:
                        table = pyarrow.Table.from_batches(batches)
                        pdf = table.to_pandas()
                        pdf = _check_dataframe_convert_date(pdf, self.schema)
                        return _check_dataframe_localize_timestamps(pdf, timezone)
                    else:
                        return pd.DataFrame.from_records([], columns=self.columns)
                except Exception as e:
                    # We might have to allow fallback here as well but multiple Spark jobs can
                    # be executed. So, simply fail in this case for now.
                    msg = (
                        "toPandas attempted Arrow optimization because "
                        "'spark.sql.execution.arrow.enabled' is set to true, but has reached "
                        "the error below and can not continue. Note that "
                        "'spark.sql.execution.arrow.fallback.enabled' does not have an effect "
                        "on failures in the middle of computation.\n  %s" % _exception_message(e))
                    warnings.warn(msg)
                    raise

如果使用了Arrow(Spark 2.4默认使用),比较重要的一行是_collectAsArrow(),_collectAsArrow()实现为:

    def _collectAsArrow(self):
        """
        Returns all records as a list of ArrowRecordBatches, pyarrow must be installed
        and available on driver and worker Python environments.

        .. note:: Experimental.
        """
        with SCCallSiteSync(self._sc) as css:
            sock_info = self._jdf.collectAsArrowToPython()
        return list(_load_from_socket(sock_info, ArrowStreamSerializer()))

这里面使用了ArrowStreamSerializer(),而ArrowStreamSerializer定义为

class ArrowStreamSerializer(Serializer):
    """
    Serializes Arrow record batches as a stream.
    """

    def dump_stream(self, iterator, stream):
        import pyarrow as pa
        writer = None
        try:
            for batch in iterator:
                if writer is None:
                    writer = pa.RecordBatchStreamWriter(stream, batch.schema)
                writer.write_batch(batch)
        finally:
            if writer is not None:
                writer.close()

    def load_stream(self, stream):
        import pyarrow as pa
        reader = pa.open_stream(stream)
        for batch in reader:
            yield batch

    def __repr__(self):
        return "ArrowStreamSerializer"

可以看出在这里面,jvm对数据根据Arrow规范设置好内存数据结构进行列式转化后,Python层面并不需要任何的反序列过程,而是直接读取,这也是Arrow高效的原因之一。
对比看那一下如果不使用Arrow方法为:

 def collect(self):
        """Returns all the records as a list of :class:`Row`.

        >>> df.collect()
        [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
        """
        with SCCallSiteSync(self._sc) as css:
            sock_info = self._jdf.collectToPython()
        return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))

序列化方法为PickleSerializer,需要对每一条数据使用PickleSerializer进行反序列化。

那如何通过这一特性来进行我们的开发呢,Spark提供了Pandas UDFs功能,即向量化UDF,Pandas UDF主要是通过Arrow将JVM里面的Spark DataFrame传输给Python生成pandas DataFrame,并执行用于定义的UDF。目前有两种类型,一种是Scalar,一种是Grouped Map。
这里主要介绍一下Scalar Python UDFs的使用,以及可能的场景。 Scalar Python UDFs可以在select和withColumn中使用,他的输入参数为pandas.Series类型,输出参数为相同长度的pandas.Series。Spark内部会通过Arrow将列式数据根据batch size获取后,批量的将数据转化为pandas.Series类型,并在每个batch都执行用户定义的function。最后将不同batch的结果进行整合,获取最后的数据结果。
以下是官网的一个例子:

import pandas as pd

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a, b):
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local Pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

首先定义udf,multiply_func,主要功能就是将a、b两列的数据对应行数据相乘获取结果。然后通过pandas_udf装饰器生成Pandas UDF。最后使用df.selecct方法调用Pandas UDF获取结果。这里面要注意的是pandas_udf的输入输出数据是向量化数据,包含了多行,可以根据spark.sql.execution.arrow.maxRecordsPerBatch来设置。
可以看出Pandas UDF使用非常简单,只需要定义好Pandas UDF就可以了。有了Pandas UDF后我们可以很容易的将深度学习框架和Spark进行结合,比如在UDF中使用一些深度学习框架,比如scikit-learn,我们可以对批量的数据分别进行训练。下面是一个简单的例子,利用Pandas UDF来进行训练:

# Load necessary libraries
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
import pandas as pd
from scipy.optimize import leastsq
import numpy as np

# Create the schema for the resulting data frame
schema = StructType([StructField('ID', LongType(), True),
                     StructField('p0', DoubleType(), True),
                     StructField('p1', DoubleType(), True)])
# Define the UDF, input and outputs are Pandas DFs
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def analyze_player(sample_pd):
    # return empty params in not enough data
    if (len(sample_pd.shots) <= 1):
        return pd.DataFrame({'ID': [sample_pd.player_id[0]], 
                                   'p0': [ 0 ], 'p1': [ 0 ]})
     
    # Perform curve fitting     
    result = leastsq(fit, [1, 0], args=(sample_pd.shots, 
                                  sample_pd.hits))
    # Return the parameters as a Pandas DF 
    return pd.DataFrame({'ID': [sample_pd.player_id[0]], 
                       'p0': [result[0][0]], 'p1': [result[0][1]]})
# perform the UDF and show the results 
player_df = df.groupby('player_id').apply(analyze_player)
display(player_df)

除此之外还可以使用TensorFlow和MXNet等与Spark进行融合,近期阿里云EMR Data Science集群将会推出相应的功能,整合EMR Spark与深度学习框架之间调度与数据交换功能,希望大家关注。

上一篇:Vitamio中文API文档(1)—— MediaStore


下一篇:go test 测试用例那些事