Streaming
新建一个txt文件用来保存黑名单
在系统上找到sc
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext(appName="myAppName")
有的电脑直接就能找到,就不需要上面这段代码了
调取Streaming
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark import SparkConf
conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=pyspark.SparkContext.getOrCreate(conf)
ssc = StreamingContext(sc,10)
lines = ssc.socketTextStream('127.0.0.1',8888)
看起来这个SparkConf像构建一个工作区,通过监听这个工作区来得到值。然后绑定这个窗口到8888端口。
由于是靠python来写的,与书上不一致,只有猜一下他的作用。
接下来就要对进来的数据进行筛选了。先测试一下(防止重启太麻烦)
blackForm = sc.textFile("./blackForm.txt") # 读取黑名单
blackForm=blackForm.map(lambda x:x.split(" ")) # 空格切分
rdd1 = sc.parallelize([["1111","hadoop"],["2222",'spark'],["3333","hive"]]) # 测试rdd
rdd1=rdd1.map(lambda x:[x[1],x[0]])
rdd1.leftOuterJoin(blackForm).collect()
由于要求是用左连接。就改一下输入值的顺序
效果:
按这个图来说。理论上是可行的。
那就直接开始在启动程序里写
blackForm = sc.textFile("./blackForm.txt")
blackForm=blackForm.map(lambda x:x.split(" "))
data = lines.map(lambda x:x.split(" ")) # 切分
data = data.map(lambda x:[x[1],x[0]]) # 转格式
data = data.transform(lambda x:x.leftOuterJoin(blackForm))
data = data.filter(lambda x:x[1][1]=='false') # 筛选
data = data.map(lambda x:(x[0],x[1][0])) # 转输出格式
data.pprint()
ssc.start()
启动这段代码后在本机的窗口命令行输入nc -l -p 8888
然后输入
在监听行可以看到黑名单中为true的都被删选掉了
这里的转格式反了,但并不影响运行。如果输入nc -l -p … 无法输入。那么就换个奇怪点的端口。保证这个端口没被占用就行了。
Spark GraphFrame
读入数据查看格式。
test=sc.textFile("d:/data/web-Google.txt")
test.take(10)
暂未可知。这明显构造不出来呀。用GraphFrame。Spark里面的DataFrame无法只创造1列啊。这构造出来一点也好理解啊。书上的GraphX应该才能构造出来。
看了半天,这样应该是可行的
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from graphframes import *
from pyspark.sql.types import *
# 启动pyspark需要带参数
spark = SparkSession.builder.appName("testGraph") \
.master('local') \
.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
countId = sc.textFile('D:/data/web-Google.txt').map(lambda x:x.split("\t")).map(lambda x:x[0]).distinct().map(lambda x:(int(x),x))
schema = StructType([
StructField("id", IntegerType(), True),
StructField("from", StringType(), True),
])
df_v = sqlContext.createDataFrame(countId,schema)
这里的countId就是找到有多少不重复的ID,根据ID创建一个一模一样的value这样就可以用来创建DataFrame了,到后面再把这列from去掉
构造边
schema = StructType([
# true代表不为空
StructField("src", IntegerType(), True),#
StructField("dst", IntegerType(), True),#
StructField("relation", StringType(), True),
])
rdd2 = sc.textFile('D:/data/web-Google.txt') \
.map(lambda x:x.split("\t")) \
.map(lambda x:(int(x[0]),int(x[1]),"link"))
df_e = sqlContext.createDataFrame(rdd2, schema)
构造图
g = GraphFrame(df_v,df_e)
最后出来的图效果
查询顶点和边的个数
countId即是我们的顶点。做一个计数则是顶点个数为73945,边就是edges的条数为5105039
查看每个网页被链接次数
from graphframes.lib import AggregateMessages as AM
from pyspark.sql import functions as F
g_new.aggregateMessages(F.count(AM.msg).alias("count"),
sendToSrc="1").show()
向顶点发送1,然后count计数
筛选出id>1的组成子图
# 好像就是这样,有可能有问题
overOne = g_new.filterVertices('id > 1')