Spark作业

Streaming

新建一个txt文件用来保存黑名单
Spark作业
在系统上找到sc

import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext(appName="myAppName")

有的电脑直接就能找到,就不需要上面这段代码了

调取Streaming

from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark import SparkConf
conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=pyspark.SparkContext.getOrCreate(conf)
ssc = StreamingContext(sc,10)
lines = ssc.socketTextStream('127.0.0.1',8888)

看起来这个SparkConf像构建一个工作区,通过监听这个工作区来得到值。然后绑定这个窗口到8888端口。
由于是靠python来写的,与书上不一致,只有猜一下他的作用。

接下来就要对进来的数据进行筛选了。先测试一下(防止重启太麻烦)

blackForm = sc.textFile("./blackForm.txt")  # 读取黑名单
blackForm=blackForm.map(lambda x:x.split(" "))   # 空格切分
rdd1 = sc.parallelize([["1111","hadoop"],["2222",'spark'],["3333","hive"]])  # 测试rdd
rdd1=rdd1.map(lambda x:[x[1],x[0]])
rdd1.leftOuterJoin(blackForm).collect()

由于要求是用左连接。就改一下输入值的顺序
效果:
Spark作业
按这个图来说。理论上是可行的。

那就直接开始在启动程序里写

blackForm = sc.textFile("./blackForm.txt")
blackForm=blackForm.map(lambda x:x.split(" "))
data = lines.map(lambda x:x.split(" "))  # 切分
data = data.map(lambda x:[x[1],x[0]]) # 转格式
data = data.transform(lambda x:x.leftOuterJoin(blackForm))
data = data.filter(lambda x:x[1][1]=='false') # 筛选
data = data.map(lambda x:(x[0],x[1][0])) # 转输出格式
data.pprint()
ssc.start()

启动这段代码后在本机的窗口命令行输入nc -l -p 8888
然后输入
Spark作业
在监听行可以看到黑名单中为true的都被删选掉了
Spark作业
这里的转格式反了,但并不影响运行。如果输入nc -l -p … 无法输入。那么就换个奇怪点的端口。保证这个端口没被占用就行了。

Spark GraphFrame

读入数据查看格式。

test=sc.textFile("d:/data/web-Google.txt")
test.take(10)

Spark作业
暂未可知。这明显构造不出来呀。用GraphFrame。Spark里面的DataFrame无法只创造1列啊。这构造出来一点也好理解啊。书上的GraphX应该才能构造出来。

看了半天,这样应该是可行的

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from graphframes import *
from pyspark.sql.types import *

# 启动pyspark需要带参数
spark = SparkSession.builder.appName("testGraph") \
.master('local') \
.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
countId = sc.textFile('D:/data/web-Google.txt').map(lambda x:x.split("\t")).map(lambda x:x[0]).distinct().map(lambda x:(int(x),x))
schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("from", StringType(), True),
    ])
df_v = sqlContext.createDataFrame(countId,schema)

这里的countId就是找到有多少不重复的ID,根据ID创建一个一模一样的value这样就可以用来创建DataFrame了,到后面再把这列from去掉

构造边

schema = StructType([
        # true代表不为空
        StructField("src", IntegerType(), True),#
        StructField("dst", IntegerType(), True),#
        StructField("relation", StringType(), True),
    ])
rdd2 = sc.textFile('D:/data/web-Google.txt') \
.map(lambda x:x.split("\t")) \
.map(lambda x:(int(x[0]),int(x[1]),"link"))
df_e = sqlContext.createDataFrame(rdd2, schema)

构造图

g = GraphFrame(df_v,df_e)

最后出来的图效果
Spark作业
查询顶点和边的个数
Spark作业
countId即是我们的顶点。做一个计数则是顶点个数为73945,边就是edges的条数为5105039

查看每个网页被链接次数

from graphframes.lib import AggregateMessages as AM
from pyspark.sql import functions as F
g_new.aggregateMessages(F.count(AM.msg).alias("count"),
    sendToSrc="1").show()

向顶点发送1,然后count计数
Spark作业
筛选出id>1的组成子图

# 好像就是这样,有可能有问题
overOne = g_new.filterVertices('id > 1')
上一篇:Python+大数据学习笔记(一)


下一篇:在Windows中安装PySpark环境