大数据期末总结复习

信息来源于某位帅男 : 20道选择题,一题2分,2~3道大题:mapreduce求解,spark RDD,hdfs(选择题),hbase(数据表的选择设计问题,操作问题)

一、一些基本概念

1.python基础

# 1).单行注释用“#”,多行注释用一对‘’‘,或者"""包裹内容。
# 2).python的输入输出:
	a = int(input())
    b = int(input())
	print("%d + %d = %d" % (a, b, a + b))
# 3).嵌套与循环:
--------------------------------
	#判断语句
	if x <= 0 :
		x = -1
--------------------------------
	#按索引遍历
	for x in range(list):
		print(list(x))
--------------------------------
	#直接遍历值
	for x in list:
		print(x)
--------------------------------
	#while循环
	n = 5
	while n > 0 :
		print(" I love xr ! ")
		n=n-1
--------------------------------
# 4).函数:
	def move(x, y, step, angle=0):
    nx = x + step * math.cos(angle)
    ny = y - step * math.sin(angle)
    return nx, ny
    
    命令行调用运行结果如下:
    >>> x, y = move(100, 100, 60, math.pi / 6)
	>>> print(x, y)
	151.96152422706632 70.0
# 5).列表list:
	保留8位小数:list.append('{:.8}.format(4*pi)')

大数据期末总结复习

# 6).元组tuple:

大数据期末总结复习

# 7).pymysql链接数据库
	# 导入pymysql模块
	import pymysql
	# 连接database
	conn = pymysql.connect(host='localhost', port=3306,
                       user='root', passwd='root',
                       charset='utf8', db = 'mydb')
	# 得到一个可以执行SQL语句的游标对象
	cursor = conn.cursor()
	# 定义要执行的SQL语句
	sql = """
	CREATE TABLE USER1 (
		id INT auto_increment PRIMARY KEY ,
		name CHAR(10) NOT NULL UNIQUE,
		age TINYINT NOT NULL
	)ENGINE=innodb DEFAULT CHARSET=utf8;  #注意:charset='utf8' 不能写成utf-8
		"""
	# 执行SQL语句
	cursor.execute(sql)
	# 关闭游标对象
	cursor.close()
	# 关闭数据库连接
	conn.close()

2.分布式文件系统HDFS

当数据集的大小超过一*立的物理计算机的存储能力时,就有必要对它进行分区并存储到若干台单独的计算机上,管理网络中跨多台计算机存储的文件系统称为分布式文件系统(Distributed FileSystem)。

Hadoop自带一个称为HDFS的分布式文件系统,即HDFS(Hadoop Distributed FileSystem)。有时也称之为DFS,他们是一回事儿。

NameNode与DataNode
HDFS有两类节点用来管理集群的数据,即一个namenode(管理节点)和多个datanode(工作节点)。namenode管理文件系统的命名空间,它维护着系统数及整棵树内所有的文件和目录,这些信息以两个形式永久保存在本地磁盘上:命名空间镜像文件和编辑日志文件,namenode也记录着每个文件中各个块所在的数据节点信息,但它并不永久保存块的位置信息,因为这些信息会在系统启动时根据节点信息重建。

客户端(client)代表用户通过与namenode和datanode交互来访问整个系统。客户端提供一个类似POSIX(可移植操作系统界面)的文件系统结构,因此用户编程时无需知道namenode和datanode也可以实现功能。
datanode是文件系统的工作节点,他们根据需要存储并检索数据块(blocks),并且定期向namenode发送他们所存储的数据块的列表。

大数据期末总结复习

3.大数据计算框架MapReduce

什么是MapReduce?
个任务,任务是:挖掘分析我国气象中心近年来的数据日志,该数据日志大小有3T,让你分析计算出每一年的最高气温,如果你现在只有一台计算机,如何处理呢?我想你应该会读取这些数据,并且将读取到的数据与目前的最大气温值进行比较。比较完所有的数据之后就可以得出最高气温了。不过以我们的经验都知道要处理这么多数据肯定是非常耗时的。

如果我现在给你三台机器,你会如何处理呢?看到下图你应该想到了:最好的处理方式是将这些数据切分成三块,然后分别计算处理这些数据(Map),处理完毕之后发送到一台机器上进行合并(merge),再计算合并之后的数据,归纳(reduce)并输出。

这就是一个比较完整的MapReduce的过程了。

4.分布式数据库(Hbase)

# 启动hbase
start-hbase.sh
# 进入hbase shell 窗口
hbase shell
# 创建一个表
create 'test','data'
# 查看表是否创建成功
list
# 添加数据
put 'test','row1','data:1','value1'
put 'test','row2','data:2','value2'
# 获取数据
get 'test','row1'
# 查看所有的数据
scan 'test'

# 删除数据
delete 'test','row1'
# 删除表
disable 'test' #先将表设置为禁用
drop 'test'
1)Hbase分布式环境的整体架构

大数据期末总结复习

Zookeeper能为HBase提供协同服务,是HBase的一个重要组件,Zookeeper能实时的监控HBase的健康状态,并作出相应处理。

HMaster是HBase的主服务,他负责监控集群中所有的HRegionServer,并对表和Region进行管理操作,比如创建表,修改表,移除表等等。

HRegionServer是RegionServer的实例,它负责服务和管理多个HRegion 实例,并直接响应用户的读写请求。

HRegion是对表进行划分的基本单元,一个表在刚刚创建时只有一个Region,但是随着记录的增加,表会变得越来越大,HRegionServer会实时跟踪Region的大小,当Region增大到某个值时,就会进行切割(split)操作,由一个Region切分成两个Region。

总的来说,要部署一个分布式的HBase数据库,需要各个组件的协作,HBase通过Zookeeper进行分布式应用管理,Zookeeper相当于管理员,HBase将数据存储在HDFS(分布式文件系统)中,通过HDFS存储数据,所以我们搭建分布式的HBase数据库的整体思路也在这里,即将各个服务进行整合。

2). 搭建happybase环境,为编写python程序访问HBase中数据做好准备

happybase主要是用来操作hbase的,首先我们需要安装好happybse环境,然后启动hdfs和hbase,最后测试python-happybase即可。

使用happpybase连接HBase数据库:
import happybase
happybase.Connection(host=’localhost’, port=9090, timeout=None, autoconnect=True, table_prefix=None, table_prefix_separator=b’_’, compat=’0.98’, transport=’buffered’, protocol=’binary’)
获取连接实例
host:主机名
port:端口
timeout:超时时间
autoconnect:连接是否直接打开
table_prefix:用于构造表名的前缀
table_prefix_separator:用于table_prefix的分隔符
compat:兼容模式
transport:运输模式
protocol:协议

# 创建一个表
connection.create_table(
    ‘my_table’,
    {
        ‘cf1’: dict(max_versions=10),
        ‘cf2’:dict(max_versions=1,block_cache_enabled=False),
        ‘cf3’: dict(),  # use defaults
    }
)
此时,我们再通过connection.tables()查看可以使用的table,结果为[‘my_table’]
创建的table即my_table包含3个列族:cf1、cf2、cf3

5.大数据计算框架Spark

1). 安装与配置Scala开发环境

为什么我们要安装配置scala呢?
因为Spark框架底层是使用Scala开发的,使用Scala写出的代码远比java简洁,因此安装与配置scala环境是我们在学习Spark之前要完成的准备工作。

2). Spark架构

同样如上,我们首先要知道Spark是什么?
Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎。我们知道mapReduce,Spark就是类Hadoop MapReduce的通用并行框架,Spark具有hadoop MapReduce的所有优点,甚至比它能够更好地使用与数据挖掘和机器学习等需要迭代的MapReduce的算法。

大数据期末总结复习

基本概念:

Application:用户编写的Spark应用程序,包含一个Driver和多个Executor。

Driver:Spark中的Driver即运行上述Application的main函数并创建SparkContext,创建SparkContext的目的是为了准备Spark应用程序的运行环境,在Spark中有SparkContext负责与 ClusterManager通信,进行资源申请、任务的分配和监控等,当Executor部分运行完毕后,Driver同时负责将SparkContext关闭。

Executor:是运行在工作节点WorkerNode的一个进程,负责运行 Task。

RDD:弹性分布式数据集,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型。

DAG:有向无环图,反映RDD之间的依赖关系。

Task:运行在Executor上的工作单元。

Job:一个Job包含多个RDD及作用于相应RDD上的各种操作。

Stage:是Job的基本调度单位,一个Job会分为多组Task,每组Task被称为Stage,或者也被称为TaskSet,代表一组关联的,相互之间没有Shuffle依赖关系的任务组成的任务集。

Cluster Manager:指的是在集群上获取资源的外部服务。目前有三种类型:

Standalon:Spark原生的资源管理,由Master负责资源的分配;
Apache Mesos:与Hadoop MR兼容性良好的一种资源调度框架;
Hadoop Yarn:主要是指Yarn中的ResourceManager。

3). Spark运行流程

大数据期末总结复习

  1. 构建Spark Application的运行环境,启动SparkContext;
  2. SparkContext向资源管理器(可以是Standalone,Mesos,Yarn)申请运行Executor资源;
  3. Executor向SparkContext申请Task;
  4. SparkContext构建DAG图,将DAG图分解成Stage 、并将 Stage封装成Taskset发送给Task Scheduler,最后由Task Scheduler将Task发送给Executor运行;
  5. Task在Executor上运行,运行完释放所有资源。

4). RDD理解检测

在Spark中,对数据的所有操作不外乎创建RDD、转化已有RDD以及调用 RDD操作进行求值。简单的来说RDD就是一个集合,一个将集合中数据存储在不同机器上的集合。这么说清楚了吧?
不清楚就再来个例子,

# 如何使用集合并行化创建一个Spark RDD ?
# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    #********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个1到8的列表List
    data = [1,2,3,4,5,6,7,8]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用 rdd.collect() 收集 rdd 的内容。 rdd.collect() 是 Spark Action 算子,在后续内容中将会详细说明,主要作用是:收集 rdd 的数据内容
    sum = rdd.collect()

    # 5.打印 rdd 的内容
    print(sum)

    # 6.停止 SparkContext
    sc.stop()
    #********** End **********#

现在我们知道什么是RDD了,再来讲一讲它的五大特性。

  1. 一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
  1. 一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个 RDD都会实现 compute 函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
  1. RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
  1. 一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的 RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
  1. 一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个 Partition 所在的块的位置。按照“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

5). SparkSQL

我们开始编写 Spark SQL,从何开始呢?答案就是SparkSession。
SparkSession 是 Spark SQL 的入口。要创建基本的 SparkSession,只需使用SparkSession.builder()。

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

有了 SparkSession,下一步就是创建 DataFrame
使用 SparkSession 可以从现有 RDD,Hive 表或 Spark 数据源(json,parquet,jdbc,orc,libsvm,csv,text)等格式文件创建DataFrame.
以下示例为读取 Json 文件创建 DataFrame。

df =spark.read.json("/people.json")

people.json 数据如下:
{“name”:“Michael”}
{“name”:“Andy”, “age”:30}
{“name”:“Justin”, “age”:19}

使用DataFrame

#打印Schema信息
df.printSchema()
#选择姓名列
df.select("name").show()

通过SQL语句的方式
#首先注册df为一个视图
df.createOrReplaceTempView("p")
#通过spark.sql("SQL语句")执行SQL语句
sqlDF = spark.sql("SELECT name FROM p")
sqlDF.show()

# 写入并保存到指定路径
df.select("name", "age").write.format("parquet").save("F:\\test\\anamesAndAges")
# 覆盖原有数据并保存到 F:\\test 路径下
df.select("name", "age").write.mode("overwrite").format("parquet").save("F:\\test")

二、educoder原题

1.python基础

--------------------------------------------------------------
# 摄氏度的转换:华氏转摄氏度
def Table_For(min,max):
    #请在此处用for循环打印列表
    #   请在此添加实现代码   #
    # ********** Begin *********#
    print("华氏度\t\t摄氏度\t\t近似摄氏度")
    print("****************************************")
    for i in range(min, max + 10, 10):
        b = (i - 32)/1.8
        c = (i - 30) / 2
        print("%d\t\t%.1f\t\t%.1f" % (i,b, c))
    return 0
--------------------------------------------------------------
# 读取文件和将数据存储为文件
def solve(file):
    sum = []
    ans ={}
    with open(file, encoding='utf-8') as file_obj:
        lines = file_obj.readlines()
    for line in lines[2:]:
        k = line.rstrip().split('\t')
#        # print(k)
        sum.append(k)
    print(sum)
 solve('src/step1/constants.txt')
#********** End **********#

2.hdfs(选择题)

选择题?我觉得那可能就会考一些命令语句吧。

1). Linux下命令行

# 创建文件夹
mkdir /app
# 创建文件
touch hello.txt
# 切换到 /opt 目录下
cd /opt
# 解压压缩文件
tar -zxvf jdk-8u171-linux-x64.tar.gz
# 移动文件
mv jdk1.8.0_171/ /app
# 编辑配置文件
vim /etc/profile
# 使刚刚的配置生效
source /etc/profile

# 格式化HDFS文件
hadoop namenode -format
# 启动hadoop
start-dfs.sh
# 验证Hadoop

大数据期末总结复习

2). Hdfs下命令行H

# 在HDFS中创建文件夹
hadoop fs -mkdir /test
# 查看是否创建成功
hadoop fs -ls /
# 将文件上传至HDFS
hadoop fs -put hello.txt /test
# 查看文件
hadoop fs -cat /test/hello.txt

大数据期末总结复习

3.hbase(数据表的选择设计问题、操作问题)

1) 使用python代码向HBase表中并添加、删除数据,并查看数据
一、添加数据
table = connection.table(‘my_table’)       #首先获得表对象

cloth_data = {'cf1:content': 'jeans', 'cf1:price': '299', 'cf1:rating': '98%'}
hat_data = {'cf1:content': 'cap', 'cf1:price': '88', 'cf1:rating': '99%'}
# 使用put一次只能存储一行数据,如果row key已经存在,则变成了修改数据
table.put(row='www.test1.com', data=cloth_data)
table.put(row='www.test2.com', data=hat_data)
# 使用batch一次插入多行数据
bat = table.batch()
bat.put('www.test5.com', {'cf1:price': 999, 'cf2:title': 'Hello Python', 'cf2:length': 34, 'cf3:code': 'A43'})
bat.put('www.test6.com', {'cf1:content': 'razor', 'cf1:price': 168, 'cf1:rating': '97%'})
bat.send()
# 使用上下文管理器来管理batch,这样就不用手动发送数据了,即不再需要bat.send()
with table.batch() as bat:
    bat.put('www.test5.com', {'cf1:price': '999', 'cf2:title': 'Hello Python', 'cf2:length': '34', 'cf3:code': 'A43'})
    bat.put('www.test6.com', {'cf1:content': u'剃须刀', 'cf1:price': '168', 'cf1:rating': '97%'})
二、删除数据
with table.batch() as bat:
bat.delete(‘www.test1.com')
三、检索数据
 # 全局扫描一个table
 for key, value in table.scan():
    print key, value
 # 检索一行数据
 row = table.row(‘www.test4.com') print row
 # 检索多行数据
rows = table.rows([‘www.test1.com', ‘www.test4.com'])print rows

4.mapReduce求解

1) 统计两个文本文件中,每个单词出现的次数。
# mapper.py
#! /usr/bin/python3
import sys
def main():
    # 从标准输入流中接受数据行,对每一行调用mapper函数来处理
    for line in sys.stdin:
        line = line.strip()
        mapper(line)
# 每行分割为一个个单词,用word表示
# hadoop streaming要求用"键\t值"形式输出键值对
def mapper(line):
    words = line.split(' ')
    for word in words:
        if len(word.strip()) == 0:
            continue
        print("%s\t%s" % (word, 1))
if __name__ == '__main__':
    main()
-------------------------------------------------------------
#Reduce.py
#! /usr/bin/python3
import sys
from operator import itemgetter
# 对values求和,并按"单词\t词频"的形式输出。
def reducer(k, values):
    print("%s\t%s" % (k, sum(values)))
def main():
    current_key = None
    values = []
    _key, _value = '', 0
    for line in sys.stdin:
        line = line.strip()
        _key, _value = line.split('\t', 1)
        _value = eval(_value)
        if current_key == _key:
            values.append(_value)
        else:
            if current_key:
                reducer(current_key, values)
                values = []
            values.append(_value)
            current_key = _key
    # 不要忘记最后一个键值对
    if current_key == _key:
        reducer(current_key, values)
if __name__ == '__main__':
    main()
2) 对两个文件进行合并,并剔除其中重复的内容
# mapper.py
#! /usr/bin/python3
import sys

def main():
    for line in sys.stdin:
        line = line.strip()
        mapper(line)

def mapper(line):
    ########## Begin  ###############
    num,st = line.split(' ')
    print("%s\t%s" %(num,st))
   ###########  End    #############
if __name__ == '__main__':
    main()
--------------------------------------------------------------
# Reduce.py
#! /usr/bin/python3
import sys

def reducer(k, values):
    ############  Begin   ################
    for value in sorted(list(set(values))):
        print("%s\t%s" %(k,value))
    ############   End    ################

def main():
    current_key = None
    values = []
    akey, avalue = None, None
    for line in sys.stdin:
        line = line.strip()
        akey, avalue = line.split('\t')
        
        if current_key == akey:
            values.append(avalue)
        else:
            if current_key:
                reducer(current_key, values)
                values = []
  
            values.append(avalue)
            current_key = akey
    
    if current_key == akey:
        reducer(current_key, values)

if __name__ == '__main__':
    main()
        
3) 求挖掘其中的父子辈关系,给出祖孙辈关系的表格
# mapper.py
#! /usr/bin/python3
import sys
def main():
    for line in sys.stdin:
        line = line.strip()
        if line.startswith('child'):
            pass
        else:
            mapper(line)
           
def mapper(line):
    ###############  Begin   ############
    child, parent = line.split(' ')
    print("%s\t-%s" % (child, parent))
    print("%s\t+%s" % (parent, child))
    
    ###############   End    #############

if __name__ == '__main__':
    main()

# Reduce.py
#! /usr/bin/python3
import sys

def reducer(k, values):
    ##############    Begin    ################
    grandparents = []
    grandson = []
    for v in values:
        if v.startswith('-'):
            grandparents.append(v[1:])
        else:
            grandson.append(v[1:])
    for i in grandson:
        for j in grandparents:
            print("%s\t%s" % (i, j))
    ##############   End      #################

def main():
    current_key = None
    values = []
    akey, avalue = None, None
    print("grand_child\tgrand_parent")
    for line in sys.stdin:
        line = line.strip()
        try:
            akey, avalue = line.split('\t')
        except:
            continue
        if current_key == akey:
            values.append(avalue)
        else:
            if current_key:
                reducer(current_key, values)
                values = []
            values.append(avalue)
            current_key = akey
    if current_key == akey:
        reducer(current_key, values)

if __name__ == '__main__':
    main()

4) 数据清洗
#! /usr/bin/python3
# mapper.py
import sys
from dbhelper import DBHelper
import codecs
import time

# 获取“省市代码:省市名称”项并保存在字典regions中;
# 获取“电话号码:姓名”项并保存在字典userphones中。
regions = DBHelper.get_region()
userphones = DBHelper.get_userphones()

def main():
    # 正确输出utf-8编码的汉字
    sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach())
    for line in sys.stdin:
        line = line.strip()
        mapper(line)

def mapper(line):
    # 输出形如“邓二,张倩,13666666666,15151889601,2018-03-29 10:58:12,2018-03-29 10:58:42,30,黑龙江省,上海市”的字符串
    # 本题不需要reduce阶段,输出题目要求的内容即可,不需要使用“键\t值”的形式。
    ##########  begin      ##############
    items = line.split(',')
    caller = userphones.get(items[0])
    receiver = userphones.get(items[1])
    start_time = int(items[2])
    end_time = int(items[3])
    region_caller = regions.get(items[4])
    region_receiver = regions.get(items[5])
    print(caller,receiver,sep = ',',end = ',')
    print(','.join(items[:2]), end = ',')
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)),end=',')
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)),end=',')
    print(str(end_time - start_time), end = ',')
    print(region_caller, region_receiver, sep = ',')    
    

    ###########  End  #################

if __name__ == '__main__':
    main()
---------------------------------------------------------------
# dbhelper.py
import pymysql
import sys
import codecs

class DBHelper:
    def get_connection():
        # 根据题目提供的凭据建立到mysql服务器的连接"conn",注意字符集指定为"utf8mb4"
        ########  Begin   ############
        conn = pymysql.connect(
            host = 'localhost',
            user = 'root',
            password = '123123',
            db = 'mydb',
            port = 3306,
            charset = 'utf8mb4'
        )
        ########  End    ############    
        return conn

    @classmethod
    def get_region(cls):
        conn = cls.get_connection()
        regions = dict()
        with conn.cursor() as cur:
            #从数据库中查询所有的省市代码和省市名称,并保存到字典regions中。
            ############  Begin ###################
            sqltxt = 'select CodeNum, Address from allregion;'
            cur.execute(sqltxt)
            for row in cur.fetchall():
                regions[row[0]] = row[1].strip()
     
            ############  End    #################
        conn.close()
        return regions

    @classmethod
    def get_userphones(cls):
        conn = cls.get_connection()
        userphones = dict()
        with conn.cursor() as cur:
        #从数据库中查询所有的电话号码和对应的姓名,并保存到字典userphones中。
        ############  Begin ###################
            sqltxt = 'select phone, trueName from userphone;'
            cur.execute(sqltxt)
            for row in cur.fetchall():
                userphones[row[0]] = row[1]    
            ############  End    #################
        conn.close()
        return userphones

def main():
    sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach())
    region = DBHelper.get_region()
    users = DBHelper.get_userphones() 
    '''
    for k, v in region.items():
        print(k, ':', v)
    print('-------------')

    for k, v in users.items():
        print(k, ':', v)
    '''
if __name__ == '__main__':
    main()

5.SparkRDD

大数据期末总结复习

Spark-submit计算圆周率大数据期末总结复习

大数据期末总结复习

1).读取外部数据集创建RDD

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == '__main__':
    #********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")
    
    # 文本文件 RDD 可以使用创建 SparkContext 的textFile 方法。此方法需要一个 URI的 文件(本地路径的机器上,或一个hdfs://,s3a://等URI),并读取其作为行的集合
    # 2.读取本地文件,URI为:/root/wordcount.txt
    distFile = sc.textFile("/root/wordcount.txt")
    
    # 3.使用 rdd.collect() 收集 rdd 的内容。 rdd.collect() 是 Spark Action 算子,在后续内容中将会详细说明,主要作用是:收集 rdd 的数据内容
    s = distFile.collect()
    
    # 4.打印 rdd 的内容
    print(s)

    # 5.停止 SparkContext
    sc.stop()

    #********** End **********#

2). 使用 map 算子,将偶数转换成该数的平方;奇数转换成该数的立方。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    #********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local","Simple App")

    # 2.创建一个1到5的列表List
    list1 = [1,2,3,4,5] 

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(list1)

    # 4.使用rdd.collect() 收集 rdd 的元素。
    print(rdd.collect())

    """
    使用 map 算子,将 rdd 的数据 (1, 2, 3, 4, 5) 按照下面的规则进行转换操作,规则如下:
    需求:
        偶数转换成该数的平方
        奇数转换成该数的立方
    """
    # 5.使用 map 算子完成以上需求
    rdd_map = rdd.map(lambda x : x*x if x%2 == 0 else x*x*x)
    
    # 6.使用rdd.collect() 收集完成 map 转换的元素
    print(rdd_map.collect())

    # 7.停止 SparkContext
    sc.stop()

    #********** End **********#

3).使用 mapPartitions 算子,将字符串与该字符串的长度组合成一个元组。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

#********** Begin **********#
def f(iterator):
    list1 = []
    for x in iterator:
        length = len(x)
        list1.append((x,length))
    return list1

#********** End **********#

if __name__ == "__main__":
    #********** Begin **********#
    
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2. 一个内容为("dog", "salmon", "salmon", "rat", "elephant")的列表List
    data = ["dog", "salmon", "salmon", "rat", "elephant"]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用rdd.collect() 收集 rdd 的元素。
    print(rdd.collect())

    """
    使用 mapPartitions 算子,将 rdd 的数据 ("dog", "salmon", "salmon", "rat", "elephant") 按照下面的规则进行转换操作,规则如下:
    需求:
        将字符串与该字符串的长度组合成一个元组,例如:
        dog  -->  (dog,3)
        salmon   -->  (salmon,6)
    """

    # 5.使用 mapPartitions 算子完成以上需求
    partitions = rdd.mapPartitions(f)

    # 6.使用rdd.collect() 收集完成 mapPartitions 转换的元素
    print(partitions.collect())

    # 7.停止 SparkContext
    sc.stop()

    #********** End **********#

4).使用 filter 算子,过滤掉rdd(1, 2, 3, 4, 5, 6, 7, 8) 中的所有奇数。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    #********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个1到8的列表List
    data = [1,2,3,4,5,6,7,8]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用rdd.collect() 收集 rdd 的元素。
    print(rdd.collect())

    """
    使用 filter 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 7, 8) 按照下面的规则进行转换操作,规则如下:
    需求:
        过滤掉rdd中的奇数
    """
    # 5.使用 filter 算子完成以上需求
    rdd_filter = rdd.filter(lambda x:x%2==0)

    # 6.使用rdd.collect() 收集完成 filter 转换的元素
    print(rdd_filter.collect())

    # 7.停止 SparkContext
    sc.stop()

    #********** End **********#

5) . 使用 flatMap 算子,合并RDD的元素:([1,2,3],[4,5,6]) --> (1,2,3,4,5,6)

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
       #********** Begin **********#
       
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")
    # 2.创建一个[[1, 2, 3], [4, 5, 6], [7, 8, 9]] 的列表List
    data = [[1,2,3],[4,5,6],[7,8,9]]
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
    # 4.使用rdd.collect() 收集 rdd 的元素。
    print(rdd.collect())
    """
        使用 flatMap 算子,将 rdd 的数据 ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的规则进行转换操作,规则如下:
        需求:
            合并RDD的元素,例如:
                            ([1,2,3],[4,5,6])  -->  (1,2,3,4,5,6)
                            ([2,3],[4,5],[6])  -->  (1,2,3,4,5,6)
        """
    # 5.使用 filter 算子完成以上需求
    flat_map = rdd.flatMap(lambda x:x)
    # 6.使用rdd.collect() 收集完成 filter 转换的元素
    print(flat_map.collect())
    # 7.停止 SparkContext
    sc.stop()
    #********** End **********#

6). 使用 distinct 算子,将 rdd 中的数据进行去重。


# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    #********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个内容为(1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1)的列表List
    data = [1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())

    """
       使用 distinct 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1) 按照下面的规则进行转换操作,规则如下:
       需求:
           元素去重,例如:
                        1,2,3,3,2,1  --> 1,2,3
                        1,1,1,1,     --> 1
       """
    # 5.使用 distinct 算子完成以上需求
    distinct = rdd.distinct()

    # 6.使用rdd.collect() 收集完成 distinct 转换的元素
    print(distinct.collect())

    # 7.停止 SparkContext
    sc.stop()

    #********** End **********#

7). 使用 sortBy 算子,将 rdd 中的数据进行排序(升序)

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个内容为(1, 3, 5, 7, 9, 8, 6, 4, 2)的列表List
    data = [1, 3, 5, 7, 9, 8, 6, 4, 2]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())


    """
       使用 sortBy 算子,将 rdd 的数据 (1, 3, 5, 7, 9, 8, 6, 4, 2) 按照下面的规则进行转换操作,规则如下:
       需求:
           元素排序,例如:
            5,4,3,1,2  --> 1,2,3,4,5
       """
    # 5.使用 sortBy 算子完成以上需求
    by = rdd.sortBy(lambda x: x)

    # 6.使用rdd.collect() 收集完成 sortBy 转换的元素
    print(by.collect())

    # 7.停止 SparkContext
    sc.stop()

    #********** End **********#

8). 使用 sortByKey 算子,将 rdd 中的数据进行排序(升序)。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个内容为[(B',1),('A',2),('C',3)]的列表List
    data=[('B',1),('A',2),('C',3)]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())

    """
       使用 sortByKey 算子,将 rdd 的数据 ('B', 1), ('A', 2), ('C', 3) 按照下面的规则进行转换操作,规则如下:
       需求:
           元素排序,例如:
            [(3,3),(2,2),(1,1)]  -->  [(1,1),(2,2),(3,3)]
       """
    # 5.使用 sortByKey 算子完成以上需求
    key = rdd.sortByKey()

    # 6.使用rdd.collect() 收集完成 sortByKey 转换的元素
    print(key.collect())

    # 7.停止 SparkContext
    sc.stop()

    # ********** End **********#

9). 使用mapValues算子,将偶数转换成该数的平方,奇数转换成该数的立方

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个内容为[("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]的列表List
    data = [("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())

    """
           使用 mapValues 算子,将 rdd 的数据 ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) 按照下面的规则进行转换操作,规则如下:
           需求:
               元素(key,value)的value进行以下操作:
                                                偶数转换成该数的平方
                                                奇数转换成该数的立方
    """
    # 5.使用 mapValues 算子完成以上需求
    values = rdd.mapValues(lambda x: x*x if x%2==0 else x*x*x)

    # 6.使用rdd.collect() 收集完成 mapValues 转换的元素
    print(values.collect())

    # 7.停止 SparkContext
    sc.stop()

    # ********** End **********#

10).使用 reduceByKey 算子,将 rdd(key-value类型) 中的数据进行值累加。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个内容为[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]的列表List
    data = [("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())

    """
          使用 reduceByKey 算子,将 rdd 的数据[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] 按照下面的规则进行转换操作,规则如下:
          需求:
              元素(key-value)的value累加操作,例如:
                                                (1,1),(1,1),(1,2)  --> (1,4)
                                                (1,1),(1,1),(2,2),(2,2)  --> (1,2),(2,4)
    """
    # 5.使用 reduceByKey 算子完成以上需求
    # ruduceBy = rdd.reduceByKey(lambda x,y:x+y).collect()

    # 6.使用rdd.collect() 收集完成 reduceByKey 转换的元素
    print(rdd.reduceByKey(lambda x,y:x+y).collect())

    # 7.停止 SparkContext
    sc.stop()

    # ********** End **********#

11).Actions - 常用算子

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个内容为[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表List
    data = [1, 3, 5, 7, 9, 8, 6, 4, 2];

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.收集rdd的所有元素并print输出
    print(rdd.collect())

    # 5.统计rdd的元素个数并print输出
    print(rdd.count())

    # 6.获取rdd的第一个元素并print输出
    print(rdd.first())

    # 7.获取rdd的前3个元素并print输出
    print(rdd.take(3))

    # 8.聚合rdd的所有元素并print输出
    print(rdd.reduce(lambda x,y:x+y))

    # 9.停止 SparkContext
    sc.stop()

    # ********** End **********#

6. SparkSQL

大数据期末总结复习

1). 使用Spark SQL统计战斗机飞行性能


# coding=utf-8


from pyspark.sql import SparkSession

#**********Begin**********#

#创建SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.sql.crossJoin.enabled", "true") \
    .master("local") \
    .getOrCreate()
#读取/root/jun.json中数据
df =spark.read.json("/root/jun.json")
#创建视图
df.createOrReplaceTempView("table1")
#统计出全球飞行速度排名前三的战斗机
sqlDF = spark.sql("select cast(regexp_replace(regexp_extract(`最大飞行速度`,'[\\\d,\\\.]+',0),',','') as float) as SPEED, `名称` from table1 order by SPEED desc LIMIT 3")
#保存结果
sqlDF.write.format("csv").save("/root/airspark")

#**********End**********#
spark.stop()

2). 使用Spark SQL统计各个研发单位研制战斗机占比


# coding=utf-8


from pyspark.sql import SparkSession

#**********Begin**********#

#创建SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.sql.crossJoin.enabled", "true") \
    .master("local") \
    .getOrCreate()
    
#读取/root/jun.json中数据
df =spark.read.json("/root/jun.json").coalesce(1)
#创建视图
df.createOrReplaceTempView("table1")

#统计出全球各研发单位研制的战斗机在全球所有战斗机中的占比
sqlDF = spark.sql("select concat(cast(round(count(`研发单位`)*100/(select count(`研发单位`) from table1 where `研发单位` is not null and `名称` is not null ),2) as float),'%'),`研发单位` from table1 where `研发单位` is not null and `名称` is not null group by `研发单位`")

#保存结果
sqlDF.write.format("csv").save("/root/airspark")
#**********End**********#

spark.stop()

3).出租车轨迹数据清洗

# -*- coding: UTF-8 -*-
from pyspark.sql import SparkSession

if __name__ == '__main__':
    spark = SparkSession.builder.master("local").appName("demo").getOrCreate()
    
    #**********begin**********#
    df = spark.read.option("header", True).option("delimiter", "\t").csv("/root/data.csv")
    df.createOrReplaceTempView("table1")
    DataFrame = spark.sql(
    """
    select\
    regexp_replace(TRIP_ID,'[$@]','')  TRIP_ID,regexp_replace(CALL_TYPE,'[$@]','') CALL_TYPE,regexp_replace(ORIGIN_CALL,'[$@]','') ORIGIN_CALL,regexp_replace(TAXI_ID,'[$@]','') TAXI_ID,regexp_replace(ORIGIN_STAND,'[$@]','') ORIGIN_STAND,regexp_replace(TIMESTAMP,'[$@]','') TIMESTAMP, regexp_replace(POLYLINE,'[$@,-.\\\\[\\\\]]','') POLYLINE\
    from table1
    """)
    DataFrame.show()
    #**********end**********#
    spark.stop()

上一篇:设置椭圆形的CSS时,高度的设置是宽度的多少


下一篇:多线程面试题(Google)