Apache Spark是目前处理和使用大数据的最广泛使用的框架之一,Python是数据分析,机器学习等最广泛使用的编程语言之一。那么,为什么不一起使用它们呢?这就是Spark与python也被称为PySpark的原因。
Apache Spark开发人员每年的平均年薪为110,000美元。毫无疑问,Spark在这个行业中已经被广泛使用。由于其丰富的库集,Python今天被大多数数据科学家和分析专家使用。
将Python与Spark集成是开源社区的主要礼物。 Spark是用Scala语言开发的,与Java非常相似。它将程序代码编译为用于Spark大数据处理的JVM的字节码。为了支持Spark和Python,Apache Spark社区发布了PySpark。在本文中,我们将讨论以下主题:
1、Apache Spark简介及其功能
2、为什么选择Python?
3、使用Python设置Spark(PySpark)
4、PySpark SparkContext和数据流
5、PySpark KDD用例
Apache Spark是Apache Software Foundation开发的用于实时处理的开源集群计算框架。 Spark提供了一个接口,用于编程具有隐式数据并行和容错功能的集群。
下面是Apache Spark的一些特性,它比其它的大数据框架的优势在于:
1、速度:比传统的大型数据处理框架快100倍。
2、强大的缓存:简单的编程层提供了强大的缓存和磁盘持久性功能。
3、部署:可以通过Mesos,通过Yarn的Hadoop或Spark自己的集群管理器进行部署。
4、实时:由于内存中的计算,实时计算和低延迟。
5、多语言:这是该框架最重要的特性之一,因为它可以在Scala,Java,Python和R语言中编程。
虽然Spark是在Scala中设计的,但它的速度比Python快10倍,但只有当使用的内核数量少时,Scala才会更快。由于现在大多数分析和处理都需要大量内核,因此Scala的性能优势并不大。
对于程序员来说,由于其语法和标准库,Python相对来说更容易学习。 而且,它是一种动态类型语言,这意味着RDD可以保存多种类型的对象。
尽管Scala拥有SparkMLlib,但它没有足够的库和工具来实现机器学习和NLP目的。 此外,Scala缺乏数据可视化。
使用Python设置Spark(PySpark)
我们应该如何下载Spark并安装它,当你已经解压缩了spark文件,安装它并将其添加到.bashrc文件的路径中,输入:source .bashrc
export SPARK_HOME = /usr/lib/hadoop/spark-2.1.0-bin-hadoop2.7
export PATH = $PATH:/usr/lib/hadoop/spark-2.1.0-bin-hadoop2.7/bin
要打开PySpark shell,输入命令:./bin/pyspark
Apache Spark由于它具有令人惊叹的功能,如内存处理,polyglot和快速处理等,被许多公司用于各种行业:
Yahoo!使用Apache Spark的机器学习功能来个性化其新闻和网页以及推荐式广告。使用Spark和Python来找出哪些新闻用户有兴趣阅读和分类新闻报道,以找出哪类用户有兴趣阅读哪些新闻类别。
TripAdvisor使用Apache Spark通过比较数百个网站为其客户找到最佳酒店价格,向数百万旅客提供建议。以可读格式阅读和处理酒店评论所需的时间是在Apache Spark的帮助下完成的。
阿里巴巴运营着全球最大的Apache Spark集群,以便在其电子商务平台上分析数百PB以上的数据。
PySpark SparkContext与数据流
用Python来连接Spark,使用RD4s可以通过库Py4j来实现。 PySpark Shell将Python API链接到Spark Core并初始化Spark Context。 Spark上下文是任何Spark应用程序的核心。
1、Spark Context设置内部服务并建立到Spark执行环境的连接。
2、驱动程序中的Spark Context对象协调所有分布式进程并允许资源分配。
3、集群管理器提供执行程序,它们是具有逻辑的JVM进程。
4、Spark Context对象将应用程序发送给执行者。
5、Spark Context在每个执行器中执行任务。
PySpark KDD用例
现在让我们来看一个用例:KDD'99 Cup(国际知识发现和数据挖掘工具竞赛)。 这里我们将取数据集的一部分,因为原始数据集太大。
import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")
创建 RDD:
现在我们用这个下载的文件来创建RDD。
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)
过滤
假设我们要计算在数据集中有多少正常的相互作用。 我们可以按如下过滤raw_data RDD。
from time import time
t0 = time()
normal_count = normal_raw_data.count()
tt = time() - t0
print "There are {} 'normal' interactions".format(normal_count)
print "Count completed in {} seconds".format(round(tt,3))
统计:
现在我们来计算新RDD中有多少元素:
from time import time
t0 = time()
normal_count = normal_raw_data.count()
tt = time() - t0
print "There are {} 'normal' interactions".format(normal_count)
print "Count completed in {} seconds".format(round(tt,3))
输出结果如下:
There are 97278 'normal' interactions
Count completed in 5.951 seconds
映射:
在这种情况下,我们想要将数据文件作为CSV格式文件读取。 我们可以通过对RDD中的每个元素应用lambda函数来做到这一点,如下所示。 这里我们使用map()和take()函数转换。
from pprint import pprint
csv_data = raw_data.map(lambda x: x.split(","))
t0 = time()
head_rows = csv_data.take(5)
tt = time() - t0
print "Parse completed in {} seconds".format(round(tt,3))
pprint(head_rows[0])
输出结果:
Parse completed in 1.715 seconds
[u'0',
u'tcp',
u'http',
u'SF',
u'181',
u'5450',
u'0',
u'0',
.
.
u'normal.']
拆分:
现在我们希望将RDD中的每个元素都作为键值对(其中键是标记)(例如normal),并且该值是表示CSV格式文件中的行的整个元素列表。可以按如下进行, 这里我们用line.split()和map()函数。
def parse_interaction(line):
elems = line.split(",")
tag = elems[41]
return (tag, elems)
key_csv_data = raw_data.map(parse_interaction)
head_rows = key_csv_data.take(5)
pprint(head_rows[0])
输出结果如下:
(u'normal.',
[u'0',
u'tcp',
u'http',
u'SF',
u'181',
u'5450',
u'0',
u'0',
u'0.00',
u'1.00',
.
.
.
.
u'normal.'])
收集行为:
这里我们将使用collect()行为。 它会将RDD的所有元素存入内存。 因此,使用大型RDD时必须小心使用。
t0 = time()
all_raw_data = raw_data.collect()
tt = time() - t0
print "Data collected in {} seconds".format(round(tt,3))
输出结果如下:
Data collected in 17.927 seconds
当然,这比我们之前使用的其他任何动作花费的时间要长。 每个具有RDD片段的Spark工作节点都必须进行协调,以便检索其部分,然后将所有内容缩小到一起。
作为结合前面所有内容的最后一个例子,我们希望收集所有常规交互作为键值对。
# get data from file
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)
# parse into key-value pairs
key_csv_data = raw_data.map(parse_interaction)
# filter normal key interactions
normal_key_interactions = key_csv_data.filter(lambda x: x[0] == "normal.")
# collect all
t0 = time()
all_normal = normal_key_interactions.collect()
tt = time() - t0
normal_count = len(all_normal)
print "Data collected in {} seconds".format(round(tt,3))
print "There are {} 'normal' interactions".format(normal_count)
输出结果如下 :
Data collected in 12.485 seconds
There are 97278 normal interactions
希望你喜欢Python这篇文章,如果已经在阅读完全部内容,恭喜你不再是PySpark的新手了。
周末愉快。