PySpark3.4.4_基于StreamingContext实现网络字节流统计分析

网络字节流与嵌套字节流的区别

  1. 概念解释

    • 网络嵌套字节流
      • 在网络编程的情境下,网络嵌套字节流通常是指将字节流(字节序列)以一种分层或者包含的方式进行组织,用于在网络传输过程中更好地处理数据。例如,在一个复杂的网络协议栈中,高层协议的数据单元(往往也是字节流形式)可以嵌套在底层协议的字节流之中。这就好比包裹的嵌套,外层包裹可能包含了内层包裹的相关信息以及内层包裹本身。以 HTTP 协议在 TCP/IP 协议之上传输为例,HTTP 消息(本身是字节流)被嵌套在 TCP 的字节流中进行传输。TCP 协议负责将 HTTP 消息切割成合适的片段(字节流形式),加上 TCP 头信息(也是字节流),然后通过网络发送。接收端的 TCP 协议先处理接收到的字节流,提取出 HTTP 消息的字节流部分,再交给上层的 HTTP 协议处理。
    • 套字节流
      • 这个概念不是很常见,如果理解为 “包裹字节流” 的意思,和网络嵌套字节流有相似之处。不过,“套字节流” 可能更强调简单的封装形式,即将一个字节流作为另一个字节流的一部分进行简单包装。比如,在加密通信中,原始的字节流(如要传输的文件内容字节流)被加密算法处理后,会生成一个新的字节流,这个新字节流可以看作是原始字节流被 “套” 上了一层加密后的字节流。它可能没有像网络嵌套字节流那样涉及复杂的网络协议层次关系。
  2. 应用场景区别

    • 网络嵌套字节流
      • 广泛应用于网络通信的各个层次。在构建网络服务器和客户端应用时,不同层次的网络协议交互都涉及网络嵌套字节流。例如,在电子邮件传输(SMTP、POP3 等协议)中,邮件内容字节流被嵌套在相应的协议字节流中在网络上传输。它主要用于保证数据在不同网络环境和协议间的正确传递和解析,确保数据能够从源端的应用层通过层层协议封装,经过网络传输,最终在目的端的应用层被正确还原。
    • 套字节流
      • 更多地用于数据安全和简单的数据封装场景。如在数字签名的应用中,消息的字节流被 “套” 上签名信息的字节流,用于验证消息的来源和完整性。或者在数据存储中,为了区分不同类型的数据,将数据字节流 “套” 上一个标识头字节流进行存储,方便后续读取和分类处理。
  3. 处理方式区别

    • 网络嵌套字节流
      • 需要严格按照网络协议栈的规则进行处理。在发送端,数据从高层协议开始,一层一层地进行字节流的嵌套和封装,添加每层协议所需的头部、尾部等信息。在接收端,则是相反的过程,从最外层的协议字节流开始,逐步解包和解析,根据每层协议的规范提取出内层协议的字节流,直到最终得到应用层的数据字节流。这需要对各种网络协议的格式、功能和交互流程有深入的了解。
    • 套字节流
      • 处理相对简单,主要关注封装和提取两个操作。在封装时,根据具体的需求添加包裹字节流(如加密后的字节流添加到原始字节流外层)。在提取时,按照预先定义的规则(如加密算法对应的解密规则、数据标识头的解析规则等)去除外层字节流,获取内部的原始字节流或者所需的数据。

PySpark代码开发

需要在ubuntu环境下或windows环境下,提前安装好spark执行环境

软件说明:

  1. spark 3.4.4
  2. python 3.9.20
  3. java jdk1.8.0_431

代码说明

DataSourceSoket.py 用于模拟生成实时字节流数据的脚本

# coding:utf8
import random
from socket import socket

server = socket()

server.bind(('localhost', 9999))
server.listen(1)
while True:
    # 为了方便识别,输出一个"I'm waiting the connect ..."
    print("I'm waiting the connect ...")
    conn, addr = server.accept()
    print("Connected by {0}".format(addr))
    print(f"Connected by {addr}")
    # 输出发送数据
    # 自定义10条中文数据在一个数据容器里,并随机选取一条中文数据集输出
    # 步骤1:创建一个列表作为数据容器
    data_container = []

    # 步骤2:向列表中添加10条不同的中文数据
    chinese_data = [
        "你好,世界",
        "今天天气真好",
        "学习是一件快乐的事",
        "分享知识,传递快乐",
        "探索未知的世界",
        "坚持就是胜利",
        "努力不懈,梦想终会实现",
        "失败乃成功之母",
        "平凡造就非凡",
        "相信自己,你是最棒的",
        "I like Spark",
        "I like Flink",
        "I like Hadoop"
    ]

    data_container.extend(chinese_data)

    # 步骤3:使用random.choice()随机选择并输出一条数据
    random_item = random.choice(data_container)
    print(random_item)
    conn.sendall(random_item.encode())
    conn.close()
    print("Connection closed")

pysparkStreamingNetwordCountCN.py  SparkStreaming处理实时数据流

# coding:utf8

from __future__ import print_function

import os
import sys
import jieba
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# 设置环境变量,确保指向正确的 Java 解释器
os.environ['JAVA_HOME'] = '/opt/HadoopEco/jdk1.8.0_431'  # 替换为你的 JDK 8 安装路径
os.environ['SPARK_HOME'] = '/opt/HadoopEco/spark-3.4.4-bin-without-hadoop'

# 加载停用词表
def load_stopwords(file_path):
    """
    从指定文件或文件夹中加载停用词列表。

    参数:
    file_path (str): 停用词文件或文件夹的路径。

    返回:
    set: 包含停用词的集合。
    """
    stopwords = set()
    try:
        if os.path.isfile(file_path):
            with open(file_path, 'r', encoding='utf-8') as f:
                stopwords.update(line.strip() for line in f)
        elif os.path.isdir(file_path):
            for filename in os.listdir(file_path):
                file_full_path = os.path.join(file_path, filename)
                if os.path.isfile(file_full_path):
                    with open(file_full_path, 'r', encoding='utf-8') as f:
                        stopwords.update(line.strip() for line in f)
        else:
            print(f"Error: The path {file_path} is neither a file nor a directory.")
    except FileNotFoundError:
        print(f"Error: The file or directory {file_path} does not exist.")
    except PermissionError:
        print(f"Error: Permission denied for the file or directory {file_path}.")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
    return stopwords

# 替换为你的停用词表路径或文件夹路径
stopwords = load_stopwords(sys.argv[3])  # 或 'path/to/stopwords_folder'

def sparkstreamingnetworkcount():
    global sc, ssc, lines
    sc = SparkContext(appName="PythonStreamingNetworkWordCount")
    ssc = StreamingContext(sc, 10)
    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))

    def split_words(line):
        try:
            # 使用 jieba 进行中文分词
            chinese_words = jieba.lcut(line.strip())
            # 使用空格进行英文分词
            english_words = line.strip().split(" ")
            # 合并分词结果并过滤掉空字符串
            words = set(chinese_words + english_words) - {''}
            # 过滤掉停用词
            filtered_words = [word.lower() for word in words if word not in stopwords]
            return filtered_words
        except Exception as e:
            print(f"Error processing line: {line}, Error: {e}", file=sys.stderr)
            return []

    counts = lines.flatMap(split_words).map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()

if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("Usage: networkcount.py <hostname> <port> <stopwords>", file=sys.stderr)
        exit(-1)

    sparkstreamingnetworkcount()

运行时的运行参数配置

运行结果如下

DataSourceSoket.py

pysparkStreamingNetwordCountCN.py 运行结果

注意事项:

1. 需要先启动 DataSourceSocket.py, 在启动 pysparkStreamingNetwordCountCN.py

上一篇:TensorFlow 的基本概念和使用场景


下一篇:Redis6为什么引入了多线程?