信息来源于某位帅男 : 20道选择题,一题2分,2~3道大题:mapreduce求解,spark RDD,hdfs(选择题),hbase(数据表的选择设计问题,操作问题)
一、一些基本概念
1.python基础
# 1).单行注释用“#”,多行注释用一对‘’‘,或者"""包裹内容。
# 2).python的输入输出:
a = int(input())
b = int(input())
print("%d + %d = %d" % (a, b, a + b))
# 3).嵌套与循环:
--------------------------------
#判断语句
if x <= 0 :
x = -1
--------------------------------
#按索引遍历
for x in range(list):
print(list(x))
--------------------------------
#直接遍历值
for x in list:
print(x)
--------------------------------
#while循环
n = 5
while n > 0 :
print(" I love xr ! ")
n=n-1
--------------------------------
# 4).函数:
def move(x, y, step, angle=0):
nx = x + step * math.cos(angle)
ny = y - step * math.sin(angle)
return nx, ny
命令行调用运行结果如下:
>>> x, y = move(100, 100, 60, math.pi / 6)
>>> print(x, y)
151.96152422706632 70.0
# 5).列表list:
保留8位小数:list.append('{:.8}.format(4*pi)')
# 6).元组tuple:
# 7).pymysql链接数据库
# 导入pymysql模块
import pymysql
# 连接database
conn = pymysql.connect(host='localhost', port=3306,
user='root', passwd='root',
charset='utf8', db = 'mydb')
# 得到一个可以执行SQL语句的游标对象
cursor = conn.cursor()
# 定义要执行的SQL语句
sql = """
CREATE TABLE USER1 (
id INT auto_increment PRIMARY KEY ,
name CHAR(10) NOT NULL UNIQUE,
age TINYINT NOT NULL
)ENGINE=innodb DEFAULT CHARSET=utf8; #注意:charset='utf8' 不能写成utf-8
"""
# 执行SQL语句
cursor.execute(sql)
# 关闭游标对象
cursor.close()
# 关闭数据库连接
conn.close()
2.分布式文件系统HDFS
当数据集的大小超过一*立的物理计算机的存储能力时,就有必要对它进行分区并存储到若干台单独的计算机上,管理网络中跨多台计算机存储的文件系统称为分布式文件系统(Distributed FileSystem)。
Hadoop自带一个称为HDFS的分布式文件系统,即HDFS(Hadoop Distributed FileSystem)。有时也称之为DFS,他们是一回事儿。
NameNode与DataNode
HDFS有两类节点用来管理集群的数据,即一个namenode(管理节点)和多个datanode(工作节点)。namenode管理文件系统的命名空间,它维护着系统数及整棵树内所有的文件和目录,这些信息以两个形式永久保存在本地磁盘上:命名空间镜像文件和编辑日志文件,namenode也记录着每个文件中各个块所在的数据节点信息,但它并不永久保存块的位置信息,因为这些信息会在系统启动时根据节点信息重建。
客户端(client)代表用户通过与namenode和datanode交互来访问整个系统。客户端提供一个类似POSIX(可移植操作系统界面)的文件系统结构,因此用户编程时无需知道namenode和datanode也可以实现功能。
datanode是文件系统的工作节点,他们根据需要存储并检索数据块(blocks),并且定期向namenode发送他们所存储的数据块的列表。
3.大数据计算框架MapReduce
什么是MapReduce?
个任务,任务是:挖掘分析我国气象中心近年来的数据日志,该数据日志大小有3T,让你分析计算出每一年的最高气温,如果你现在只有一台计算机,如何处理呢?我想你应该会读取这些数据,并且将读取到的数据与目前的最大气温值进行比较。比较完所有的数据之后就可以得出最高气温了。不过以我们的经验都知道要处理这么多数据肯定是非常耗时的。
如果我现在给你三台机器,你会如何处理呢?看到下图你应该想到了:最好的处理方式是将这些数据切分成三块,然后分别计算处理这些数据(Map),处理完毕之后发送到一台机器上进行合并(merge),再计算合并之后的数据,归纳(reduce)并输出。
这就是一个比较完整的MapReduce的过程了。
4.分布式数据库(Hbase)
# 启动hbase
start-hbase.sh
# 进入hbase shell 窗口
hbase shell
# 创建一个表
create 'test','data'
# 查看表是否创建成功
list
# 添加数据
put 'test','row1','data:1','value1'
put 'test','row2','data:2','value2'
# 获取数据
get 'test','row1'
# 查看所有的数据
scan 'test'
# 删除数据
delete 'test','row1'
# 删除表
disable 'test' #先将表设置为禁用
drop 'test'
1)Hbase分布式环境的整体架构
Zookeeper能为HBase提供协同服务,是HBase的一个重要组件,Zookeeper能实时的监控HBase的健康状态,并作出相应处理。
HMaster是HBase的主服务,他负责监控集群中所有的HRegionServer,并对表和Region进行管理操作,比如创建表,修改表,移除表等等。
HRegionServer是RegionServer的实例,它负责服务和管理多个HRegion 实例,并直接响应用户的读写请求。
HRegion是对表进行划分的基本单元,一个表在刚刚创建时只有一个Region,但是随着记录的增加,表会变得越来越大,HRegionServer会实时跟踪Region的大小,当Region增大到某个值时,就会进行切割(split)操作,由一个Region切分成两个Region。
总的来说,要部署一个分布式的HBase数据库,需要各个组件的协作,HBase通过Zookeeper进行分布式应用管理,Zookeeper相当于管理员,HBase将数据存储在HDFS(分布式文件系统)中,通过HDFS存储数据,所以我们搭建分布式的HBase数据库的整体思路也在这里,即将各个服务进行整合。
2). 搭建happybase环境,为编写python程序访问HBase中数据做好准备
happybase主要是用来操作hbase的,首先我们需要安装好happybse环境,然后启动hdfs和hbase,最后测试python-happybase即可。
使用happpybase连接HBase数据库:
import happybase
happybase.Connection(host=’localhost’, port=9090, timeout=None, autoconnect=True, table_prefix=None, table_prefix_separator=b’_’, compat=’0.98’, transport=’buffered’, protocol=’binary’)
获取连接实例
host:主机名
port:端口
timeout:超时时间
autoconnect:连接是否直接打开
table_prefix:用于构造表名的前缀
table_prefix_separator:用于table_prefix的分隔符
compat:兼容模式
transport:运输模式
protocol:协议
# 创建一个表
connection.create_table(
‘my_table’,
{
‘cf1’: dict(max_versions=10),
‘cf2’:dict(max_versions=1,block_cache_enabled=False),
‘cf3’: dict(), # use defaults
}
)
此时,我们再通过connection.tables()查看可以使用的table,结果为[‘my_table’]
创建的table即my_table包含3个列族:cf1、cf2、cf3
5.大数据计算框架Spark
1). 安装与配置Scala开发环境
为什么我们要安装配置scala呢?
因为Spark框架底层是使用Scala开发的,使用Scala写出的代码远比java简洁,因此安装与配置scala环境是我们在学习Spark之前要完成的准备工作。
2). Spark架构
同样如上,我们首先要知道Spark是什么?
Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎。我们知道mapReduce,Spark就是类Hadoop MapReduce的通用并行框架,Spark具有hadoop MapReduce的所有优点,甚至比它能够更好地使用与数据挖掘和机器学习等需要迭代的MapReduce的算法。
基本概念:
Application:用户编写的Spark应用程序,包含一个Driver和多个Executor。
Driver:Spark中的Driver即运行上述Application的main函数并创建SparkContext,创建SparkContext的目的是为了准备Spark应用程序的运行环境,在Spark中有SparkContext负责与 ClusterManager通信,进行资源申请、任务的分配和监控等,当Executor部分运行完毕后,Driver同时负责将SparkContext关闭。
Executor:是运行在工作节点WorkerNode的一个进程,负责运行 Task。
RDD:弹性分布式数据集,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型。
DAG:有向无环图,反映RDD之间的依赖关系。
Task:运行在Executor上的工作单元。
Job:一个Job包含多个RDD及作用于相应RDD上的各种操作。
Stage:是Job的基本调度单位,一个Job会分为多组Task,每组Task被称为Stage,或者也被称为TaskSet,代表一组关联的,相互之间没有Shuffle依赖关系的任务组成的任务集。
Cluster Manager:指的是在集群上获取资源的外部服务。目前有三种类型:
Standalon:Spark原生的资源管理,由Master负责资源的分配;
Apache Mesos:与Hadoop MR兼容性良好的一种资源调度框架;
Hadoop Yarn:主要是指Yarn中的ResourceManager。
3). Spark运行流程
- 构建Spark Application的运行环境,启动SparkContext;
- SparkContext向资源管理器(可以是Standalone,Mesos,Yarn)申请运行Executor资源;
- Executor向SparkContext申请Task;
- SparkContext构建DAG图,将DAG图分解成Stage 、并将 Stage封装成Taskset发送给Task Scheduler,最后由Task Scheduler将Task发送给Executor运行;
- Task在Executor上运行,运行完释放所有资源。
4). RDD理解检测
在Spark中,对数据的所有操作不外乎创建RDD、转化已有RDD以及调用 RDD操作进行求值。简单的来说RDD就是一个集合,一个将集合中数据存储在不同机器上的集合。这么说清楚了吧?
不清楚就再来个例子,
# 如何使用集合并行化创建一个Spark RDD ?
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 2.创建一个1到8的列表List
data = [1,2,3,4,5,6,7,8]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用 rdd.collect() 收集 rdd 的内容。 rdd.collect() 是 Spark Action 算子,在后续内容中将会详细说明,主要作用是:收集 rdd 的数据内容
sum = rdd.collect()
# 5.打印 rdd 的内容
print(sum)
# 6.停止 SparkContext
sc.stop()
#********** End **********#
现在我们知道什么是RDD了,再来讲一讲它的五大特性。
- 一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
- 一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个 RDD都会实现 compute 函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
- RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
- 一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的 RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
- 一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个 Partition 所在的块的位置。按照“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
5). SparkSQL
我们开始编写 Spark SQL,从何开始呢?答案就是SparkSession。
SparkSession 是 Spark SQL 的入口。要创建基本的 SparkSession,只需使用SparkSession.builder()。
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
有了 SparkSession,下一步就是创建 DataFrame
使用 SparkSession 可以从现有 RDD,Hive 表或 Spark 数据源(json,parquet,jdbc,orc,libsvm,csv,text)等格式文件创建DataFrame.
以下示例为读取 Json 文件创建 DataFrame。
df =spark.read.json("/people.json")
people.json 数据如下:
{“name”:“Michael”}
{“name”:“Andy”, “age”:30}
{“name”:“Justin”, “age”:19}
使用DataFrame
#打印Schema信息
df.printSchema()
#选择姓名列
df.select("name").show()
通过SQL语句的方式
#首先注册df为一个视图
df.createOrReplaceTempView("p")
#通过spark.sql("SQL语句")执行SQL语句
sqlDF = spark.sql("SELECT name FROM p")
sqlDF.show()
# 写入并保存到指定路径
df.select("name", "age").write.format("parquet").save("F:\\test\\anamesAndAges")
# 覆盖原有数据并保存到 F:\\test 路径下
df.select("name", "age").write.mode("overwrite").format("parquet").save("F:\\test")
二、educoder原题
1.python基础
--------------------------------------------------------------
# 摄氏度的转换:华氏转摄氏度
def Table_For(min,max):
#请在此处用for循环打印列表
# 请在此添加实现代码 #
# ********** Begin *********#
print("华氏度\t\t摄氏度\t\t近似摄氏度")
print("****************************************")
for i in range(min, max + 10, 10):
b = (i - 32)/1.8
c = (i - 30) / 2
print("%d\t\t%.1f\t\t%.1f" % (i,b, c))
return 0
--------------------------------------------------------------
# 读取文件和将数据存储为文件
def solve(file):
sum = []
ans ={}
with open(file, encoding='utf-8') as file_obj:
lines = file_obj.readlines()
for line in lines[2:]:
k = line.rstrip().split('\t')
# # print(k)
sum.append(k)
print(sum)
solve('src/step1/constants.txt')
#********** End **********#
2.hdfs(选择题)
选择题?我觉得那可能就会考一些命令语句吧。
1). Linux下命令行
# 创建文件夹
mkdir /app
# 创建文件
touch hello.txt
# 切换到 /opt 目录下
cd /opt
# 解压压缩文件
tar -zxvf jdk-8u171-linux-x64.tar.gz
# 移动文件
mv jdk1.8.0_171/ /app
# 编辑配置文件
vim /etc/profile
# 使刚刚的配置生效
source /etc/profile
# 格式化HDFS文件
hadoop namenode -format
# 启动hadoop
start-dfs.sh
# 验证Hadoop
2). Hdfs下命令行H
# 在HDFS中创建文件夹
hadoop fs -mkdir /test
# 查看是否创建成功
hadoop fs -ls /
# 将文件上传至HDFS
hadoop fs -put hello.txt /test
# 查看文件
hadoop fs -cat /test/hello.txt
3.hbase(数据表的选择设计问题、操作问题)
1) 使用python代码向HBase表中并添加、删除数据,并查看数据
一、添加数据
table = connection.table(‘my_table’) #首先获得表对象
cloth_data = {'cf1:content': 'jeans', 'cf1:price': '299', 'cf1:rating': '98%'}
hat_data = {'cf1:content': 'cap', 'cf1:price': '88', 'cf1:rating': '99%'}
# 使用put一次只能存储一行数据,如果row key已经存在,则变成了修改数据
table.put(row='www.test1.com', data=cloth_data)
table.put(row='www.test2.com', data=hat_data)
# 使用batch一次插入多行数据
bat = table.batch()
bat.put('www.test5.com', {'cf1:price': 999, 'cf2:title': 'Hello Python', 'cf2:length': 34, 'cf3:code': 'A43'})
bat.put('www.test6.com', {'cf1:content': 'razor', 'cf1:price': 168, 'cf1:rating': '97%'})
bat.send()
# 使用上下文管理器来管理batch,这样就不用手动发送数据了,即不再需要bat.send()
with table.batch() as bat:
bat.put('www.test5.com', {'cf1:price': '999', 'cf2:title': 'Hello Python', 'cf2:length': '34', 'cf3:code': 'A43'})
bat.put('www.test6.com', {'cf1:content': u'剃须刀', 'cf1:price': '168', 'cf1:rating': '97%'})
二、删除数据
with table.batch() as bat:
bat.delete(‘www.test1.com')
三、检索数据
# 全局扫描一个table
for key, value in table.scan():
print key, value
# 检索一行数据
row = table.row(‘www.test4.com') print row
# 检索多行数据
rows = table.rows([‘www.test1.com', ‘www.test4.com'])print rows
4.mapReduce求解
1) 统计两个文本文件中,每个单词出现的次数。
# mapper.py
#! /usr/bin/python3
import sys
def main():
# 从标准输入流中接受数据行,对每一行调用mapper函数来处理
for line in sys.stdin:
line = line.strip()
mapper(line)
# 每行分割为一个个单词,用word表示
# hadoop streaming要求用"键\t值"形式输出键值对
def mapper(line):
words = line.split(' ')
for word in words:
if len(word.strip()) == 0:
continue
print("%s\t%s" % (word, 1))
if __name__ == '__main__':
main()
-------------------------------------------------------------
#Reduce.py
#! /usr/bin/python3
import sys
from operator import itemgetter
# 对values求和,并按"单词\t词频"的形式输出。
def reducer(k, values):
print("%s\t%s" % (k, sum(values)))
def main():
current_key = None
values = []
_key, _value = '', 0
for line in sys.stdin:
line = line.strip()
_key, _value = line.split('\t', 1)
_value = eval(_value)
if current_key == _key:
values.append(_value)
else:
if current_key:
reducer(current_key, values)
values = []
values.append(_value)
current_key = _key
# 不要忘记最后一个键值对
if current_key == _key:
reducer(current_key, values)
if __name__ == '__main__':
main()
2) 对两个文件进行合并,并剔除其中重复的内容
# mapper.py
#! /usr/bin/python3
import sys
def main():
for line in sys.stdin:
line = line.strip()
mapper(line)
def mapper(line):
########## Begin ###############
num,st = line.split(' ')
print("%s\t%s" %(num,st))
########### End #############
if __name__ == '__main__':
main()
--------------------------------------------------------------
# Reduce.py
#! /usr/bin/python3
import sys
def reducer(k, values):
############ Begin ################
for value in sorted(list(set(values))):
print("%s\t%s" %(k,value))
############ End ################
def main():
current_key = None
values = []
akey, avalue = None, None
for line in sys.stdin:
line = line.strip()
akey, avalue = line.split('\t')
if current_key == akey:
values.append(avalue)
else:
if current_key:
reducer(current_key, values)
values = []
values.append(avalue)
current_key = akey
if current_key == akey:
reducer(current_key, values)
if __name__ == '__main__':
main()
3) 求挖掘其中的父子辈关系,给出祖孙辈关系的表格
# mapper.py
#! /usr/bin/python3
import sys
def main():
for line in sys.stdin:
line = line.strip()
if line.startswith('child'):
pass
else:
mapper(line)
def mapper(line):
############### Begin ############
child, parent = line.split(' ')
print("%s\t-%s" % (child, parent))
print("%s\t+%s" % (parent, child))
############### End #############
if __name__ == '__main__':
main()
# Reduce.py
#! /usr/bin/python3
import sys
def reducer(k, values):
############## Begin ################
grandparents = []
grandson = []
for v in values:
if v.startswith('-'):
grandparents.append(v[1:])
else:
grandson.append(v[1:])
for i in grandson:
for j in grandparents:
print("%s\t%s" % (i, j))
############## End #################
def main():
current_key = None
values = []
akey, avalue = None, None
print("grand_child\tgrand_parent")
for line in sys.stdin:
line = line.strip()
try:
akey, avalue = line.split('\t')
except:
continue
if current_key == akey:
values.append(avalue)
else:
if current_key:
reducer(current_key, values)
values = []
values.append(avalue)
current_key = akey
if current_key == akey:
reducer(current_key, values)
if __name__ == '__main__':
main()
4) 数据清洗
#! /usr/bin/python3
# mapper.py
import sys
from dbhelper import DBHelper
import codecs
import time
# 获取“省市代码:省市名称”项并保存在字典regions中;
# 获取“电话号码:姓名”项并保存在字典userphones中。
regions = DBHelper.get_region()
userphones = DBHelper.get_userphones()
def main():
# 正确输出utf-8编码的汉字
sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach())
for line in sys.stdin:
line = line.strip()
mapper(line)
def mapper(line):
# 输出形如“邓二,张倩,13666666666,15151889601,2018-03-29 10:58:12,2018-03-29 10:58:42,30,黑龙江省,上海市”的字符串
# 本题不需要reduce阶段,输出题目要求的内容即可,不需要使用“键\t值”的形式。
########## begin ##############
items = line.split(',')
caller = userphones.get(items[0])
receiver = userphones.get(items[1])
start_time = int(items[2])
end_time = int(items[3])
region_caller = regions.get(items[4])
region_receiver = regions.get(items[5])
print(caller,receiver,sep = ',',end = ',')
print(','.join(items[:2]), end = ',')
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)),end=',')
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)),end=',')
print(str(end_time - start_time), end = ',')
print(region_caller, region_receiver, sep = ',')
########### End #################
if __name__ == '__main__':
main()
---------------------------------------------------------------
# dbhelper.py
import pymysql
import sys
import codecs
class DBHelper:
def get_connection():
# 根据题目提供的凭据建立到mysql服务器的连接"conn",注意字符集指定为"utf8mb4"
######## Begin ############
conn = pymysql.connect(
host = 'localhost',
user = 'root',
password = '123123',
db = 'mydb',
port = 3306,
charset = 'utf8mb4'
)
######## End ############
return conn
@classmethod
def get_region(cls):
conn = cls.get_connection()
regions = dict()
with conn.cursor() as cur:
#从数据库中查询所有的省市代码和省市名称,并保存到字典regions中。
############ Begin ###################
sqltxt = 'select CodeNum, Address from allregion;'
cur.execute(sqltxt)
for row in cur.fetchall():
regions[row[0]] = row[1].strip()
############ End #################
conn.close()
return regions
@classmethod
def get_userphones(cls):
conn = cls.get_connection()
userphones = dict()
with conn.cursor() as cur:
#从数据库中查询所有的电话号码和对应的姓名,并保存到字典userphones中。
############ Begin ###################
sqltxt = 'select phone, trueName from userphone;'
cur.execute(sqltxt)
for row in cur.fetchall():
userphones[row[0]] = row[1]
############ End #################
conn.close()
return userphones
def main():
sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach())
region = DBHelper.get_region()
users = DBHelper.get_userphones()
'''
for k, v in region.items():
print(k, ':', v)
print('-------------')
for k, v in users.items():
print(k, ':', v)
'''
if __name__ == '__main__':
main()
5.SparkRDD
Spark-submit计算圆周率
1).读取外部数据集创建RDD
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == '__main__':
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 文本文件 RDD 可以使用创建 SparkContext 的textFile 方法。此方法需要一个 URI的 文件(本地路径的机器上,或一个hdfs://,s3a://等URI),并读取其作为行的集合
# 2.读取本地文件,URI为:/root/wordcount.txt
distFile = sc.textFile("/root/wordcount.txt")
# 3.使用 rdd.collect() 收集 rdd 的内容。 rdd.collect() 是 Spark Action 算子,在后续内容中将会详细说明,主要作用是:收集 rdd 的数据内容
s = distFile.collect()
# 4.打印 rdd 的内容
print(s)
# 5.停止 SparkContext
sc.stop()
#********** End **********#
2). 使用 map 算子,将偶数转换成该数的平方;奇数转换成该数的立方。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local","Simple App")
# 2.创建一个1到5的列表List
list1 = [1,2,3,4,5]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(list1)
# 4.使用rdd.collect() 收集 rdd 的元素。
print(rdd.collect())
"""
使用 map 算子,将 rdd 的数据 (1, 2, 3, 4, 5) 按照下面的规则进行转换操作,规则如下:
需求:
偶数转换成该数的平方
奇数转换成该数的立方
"""
# 5.使用 map 算子完成以上需求
rdd_map = rdd.map(lambda x : x*x if x%2 == 0 else x*x*x)
# 6.使用rdd.collect() 收集完成 map 转换的元素
print(rdd_map.collect())
# 7.停止 SparkContext
sc.stop()
#********** End **********#
3).使用 mapPartitions 算子,将字符串与该字符串的长度组合成一个元组。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
#********** Begin **********#
def f(iterator):
list1 = []
for x in iterator:
length = len(x)
list1.append((x,length))
return list1
#********** End **********#
if __name__ == "__main__":
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 2. 一个内容为("dog", "salmon", "salmon", "rat", "elephant")的列表List
data = ["dog", "salmon", "salmon", "rat", "elephant"]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素。
print(rdd.collect())
"""
使用 mapPartitions 算子,将 rdd 的数据 ("dog", "salmon", "salmon", "rat", "elephant") 按照下面的规则进行转换操作,规则如下:
需求:
将字符串与该字符串的长度组合成一个元组,例如:
dog --> (dog,3)
salmon --> (salmon,6)
"""
# 5.使用 mapPartitions 算子完成以上需求
partitions = rdd.mapPartitions(f)
# 6.使用rdd.collect() 收集完成 mapPartitions 转换的元素
print(partitions.collect())
# 7.停止 SparkContext
sc.stop()
#********** End **********#
4).使用 filter 算子,过滤掉rdd(1, 2, 3, 4, 5, 6, 7, 8) 中的所有奇数。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 2.创建一个1到8的列表List
data = [1,2,3,4,5,6,7,8]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素。
print(rdd.collect())
"""
使用 filter 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 7, 8) 按照下面的规则进行转换操作,规则如下:
需求:
过滤掉rdd中的奇数
"""
# 5.使用 filter 算子完成以上需求
rdd_filter = rdd.filter(lambda x:x%2==0)
# 6.使用rdd.collect() 收集完成 filter 转换的元素
print(rdd_filter.collect())
# 7.停止 SparkContext
sc.stop()
#********** End **********#
5) . 使用 flatMap 算子,合并RDD的元素:([1,2,3],[4,5,6]) --> (1,2,3,4,5,6)
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 2.创建一个[[1, 2, 3], [4, 5, 6], [7, 8, 9]] 的列表List
data = [[1,2,3],[4,5,6],[7,8,9]]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素。
print(rdd.collect())
"""
使用 flatMap 算子,将 rdd 的数据 ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的规则进行转换操作,规则如下:
需求:
合并RDD的元素,例如:
([1,2,3],[4,5,6]) --> (1,2,3,4,5,6)
([2,3],[4,5],[6]) --> (1,2,3,4,5,6)
"""
# 5.使用 filter 算子完成以上需求
flat_map = rdd.flatMap(lambda x:x)
# 6.使用rdd.collect() 收集完成 filter 转换的元素
print(flat_map.collect())
# 7.停止 SparkContext
sc.stop()
#********** End **********#
6). 使用 distinct 算子,将 rdd 中的数据进行去重。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 2.创建一个内容为(1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1)的列表List
data = [1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素
print(rdd.collect())
"""
使用 distinct 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1) 按照下面的规则进行转换操作,规则如下:
需求:
元素去重,例如:
1,2,3,3,2,1 --> 1,2,3
1,1,1,1, --> 1
"""
# 5.使用 distinct 算子完成以上需求
distinct = rdd.distinct()
# 6.使用rdd.collect() 收集完成 distinct 转换的元素
print(distinct.collect())
# 7.停止 SparkContext
sc.stop()
#********** End **********#
7). 使用 sortBy 算子,将 rdd 中的数据进行排序(升序)
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
# ********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 2.创建一个内容为(1, 3, 5, 7, 9, 8, 6, 4, 2)的列表List
data = [1, 3, 5, 7, 9, 8, 6, 4, 2]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素
print(rdd.collect())
"""
使用 sortBy 算子,将 rdd 的数据 (1, 3, 5, 7, 9, 8, 6, 4, 2) 按照下面的规则进行转换操作,规则如下:
需求:
元素排序,例如:
5,4,3,1,2 --> 1,2,3,4,5
"""
# 5.使用 sortBy 算子完成以上需求
by = rdd.sortBy(lambda x: x)
# 6.使用rdd.collect() 收集完成 sortBy 转换的元素
print(by.collect())
# 7.停止 SparkContext
sc.stop()
#********** End **********#
8). 使用 sortByKey 算子,将 rdd 中的数据进行排序(升序)。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
# ********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 2.创建一个内容为[(B',1),('A',2),('C',3)]的列表List
data=[('B',1),('A',2),('C',3)]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素
print(rdd.collect())
"""
使用 sortByKey 算子,将 rdd 的数据 ('B', 1), ('A', 2), ('C', 3) 按照下面的规则进行转换操作,规则如下:
需求:
元素排序,例如:
[(3,3),(2,2),(1,1)] --> [(1,1),(2,2),(3,3)]
"""
# 5.使用 sortByKey 算子完成以上需求
key = rdd.sortByKey()
# 6.使用rdd.collect() 收集完成 sortByKey 转换的元素
print(key.collect())
# 7.停止 SparkContext
sc.stop()
# ********** End **********#
9). 使用mapValues算子,将偶数转换成该数的平方,奇数转换成该数的立方
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
# ********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 2.创建一个内容为[("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]的列表List
data = [("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素
print(rdd.collect())
"""
使用 mapValues 算子,将 rdd 的数据 ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) 按照下面的规则进行转换操作,规则如下:
需求:
元素(key,value)的value进行以下操作:
偶数转换成该数的平方
奇数转换成该数的立方
"""
# 5.使用 mapValues 算子完成以上需求
values = rdd.mapValues(lambda x: x*x if x%2==0 else x*x*x)
# 6.使用rdd.collect() 收集完成 mapValues 转换的元素
print(values.collect())
# 7.停止 SparkContext
sc.stop()
# ********** End **********#
10).使用 reduceByKey 算子,将 rdd(key-value类型) 中的数据进行值累加。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
# ********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 2.创建一个内容为[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]的列表List
data = [("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素
print(rdd.collect())
"""
使用 reduceByKey 算子,将 rdd 的数据[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] 按照下面的规则进行转换操作,规则如下:
需求:
元素(key-value)的value累加操作,例如:
(1,1),(1,1),(1,2) --> (1,4)
(1,1),(1,1),(2,2),(2,2) --> (1,2),(2,4)
"""
# 5.使用 reduceByKey 算子完成以上需求
# ruduceBy = rdd.reduceByKey(lambda x,y:x+y).collect()
# 6.使用rdd.collect() 收集完成 reduceByKey 转换的元素
print(rdd.reduceByKey(lambda x,y:x+y).collect())
# 7.停止 SparkContext
sc.stop()
# ********** End **********#
11).Actions - 常用算子
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
# ********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local", "Simple App")
# 2.创建一个内容为[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表List
data = [1, 3, 5, 7, 9, 8, 6, 4, 2];
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.收集rdd的所有元素并print输出
print(rdd.collect())
# 5.统计rdd的元素个数并print输出
print(rdd.count())
# 6.获取rdd的第一个元素并print输出
print(rdd.first())
# 7.获取rdd的前3个元素并print输出
print(rdd.take(3))
# 8.聚合rdd的所有元素并print输出
print(rdd.reduce(lambda x,y:x+y))
# 9.停止 SparkContext
sc.stop()
# ********** End **********#
6. SparkSQL
1). 使用Spark SQL统计战斗机飞行性能
# coding=utf-8
from pyspark.sql import SparkSession
#**********Begin**********#
#创建SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.sql.crossJoin.enabled", "true") \
.master("local") \
.getOrCreate()
#读取/root/jun.json中数据
df =spark.read.json("/root/jun.json")
#创建视图
df.createOrReplaceTempView("table1")
#统计出全球飞行速度排名前三的战斗机
sqlDF = spark.sql("select cast(regexp_replace(regexp_extract(`最大飞行速度`,'[\\\d,\\\.]+',0),',','') as float) as SPEED, `名称` from table1 order by SPEED desc LIMIT 3")
#保存结果
sqlDF.write.format("csv").save("/root/airspark")
#**********End**********#
spark.stop()
2). 使用Spark SQL统计各个研发单位研制战斗机占比
# coding=utf-8
from pyspark.sql import SparkSession
#**********Begin**********#
#创建SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.sql.crossJoin.enabled", "true") \
.master("local") \
.getOrCreate()
#读取/root/jun.json中数据
df =spark.read.json("/root/jun.json").coalesce(1)
#创建视图
df.createOrReplaceTempView("table1")
#统计出全球各研发单位研制的战斗机在全球所有战斗机中的占比
sqlDF = spark.sql("select concat(cast(round(count(`研发单位`)*100/(select count(`研发单位`) from table1 where `研发单位` is not null and `名称` is not null ),2) as float),'%'),`研发单位` from table1 where `研发单位` is not null and `名称` is not null group by `研发单位`")
#保存结果
sqlDF.write.format("csv").save("/root/airspark")
#**********End**********#
spark.stop()
3).出租车轨迹数据清洗
# -*- coding: UTF-8 -*-
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession.builder.master("local").appName("demo").getOrCreate()
#**********begin**********#
df = spark.read.option("header", True).option("delimiter", "\t").csv("/root/data.csv")
df.createOrReplaceTempView("table1")
DataFrame = spark.sql(
"""
select\
regexp_replace(TRIP_ID,'[$@]','') TRIP_ID,regexp_replace(CALL_TYPE,'[$@]','') CALL_TYPE,regexp_replace(ORIGIN_CALL,'[$@]','') ORIGIN_CALL,regexp_replace(TAXI_ID,'[$@]','') TAXI_ID,regexp_replace(ORIGIN_STAND,'[$@]','') ORIGIN_STAND,regexp_replace(TIMESTAMP,'[$@]','') TIMESTAMP, regexp_replace(POLYLINE,'[$@,-.\\\\[\\\\]]','') POLYLINE\
from table1
""")
DataFrame.show()
#**********end**********#
spark.stop()