流批一体计算引擎-10-[Flink]中的常用算子和DataStream转换

pyflink 处理 kafka数据
在这里插入图片描述

1 DataStream API 示例代码

从非空集合中读取数据,并将结果写入本地文件系统。

from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSink

def tutorial():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    ds = env.from_collection(
        collection=[(1, 'aaa'), (2, 'bbb')],
        type_info=Types.ROW([Types.INT(), Types.STRING()]))
    ds.add_sink(StreamingFileSink
                .for_row_format('output', Encoder.simple_string_encoder())
                .build())
    env.execute("tutorial_job")

if __name__ == '__main__':
    tutorial()

(1)DataStream API应用程序首先需要声明一个执行环境
StreamExecutionEnvironment,这是流式程序执行的上下文。
后续将通过它来设置作业的属性(例如默认并发度、重启策略等)、创建源、并最终触发作业的执行。

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

(2)声明数据源
一旦创建了 StreamExecutionEnvironment 之后,可以使用它来声明数据源。
数据源从外部系统(如Apache Kafka、Rabbit MQ 或 Apache Pulsar)拉取数据到Flink作业里。
为了简单起见,本次使用元素集合作为数据源。
这里从相同类型数据集合中创建数据流(一个带有 INT 和 STRING 类型字段的ROW类型)。

ds = env.from_collection(
    collection=[(1, 'aaa'), (2, 'bbb')],
    type_info=Types.ROW([Types.INT(), Types.STRING()]))

(3)转换操作或写入外部系统
现在可以在这个数据流上执行转换操作,或者使用 sink 将数据写入外部系统。
本次使用StreamingFileSink将数据写入output文件目录中。

ds.add_sink(StreamingFileSink
   .for_row_format('output', Encoder.simple_string_encoder())
   .build())

(4)执行作业
最后一步是执行真实的 PyFlink DataStream API作业。
PyFlink applications是懒加载的,并且只有在完全构建之后才会提交给集群上执行。
要执行一个应用程序,只需简单地调用env.execute(job_name)。

env.execute("tutorial_job")

在这里插入图片描述

2 自定义转换函数的三种方式

三种方式支持用户自定义函数。

2.1 Lambda函数[简便]

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT())

mapped_stream.print()
env.execute("tutorial_job")

2.2 python函数[简便]

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

def my_map_func(value):
   return value + 1

def main():
   env = StreamExecutionEnvironment.get_execution_environment()
   env.set_parallelism(1)

   data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
   mapped_stream = data_stream.map(my_map_func, output_type=Types.INT())

   mapped_stream.print()
   env.execute("tutorial_job")

if __name__ == '__main__':
   main()

2.3 接口函数[复杂]

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, MapFunction


class MyMapFunction(MapFunction):

   def map(self, value):
      return value + 1


def main():
   env = StreamExecutionEnvironment.get_execution_environment()
   env.set_parallelism(1)

   data_stream = env.from_collection([1, 2, 3, 41, 5], type_info=Types.INT())
   mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())

   mapped_stream.print()
   env.execute("tutorial_job")

if __name__ == '__main__':
   main()

3 常用算子

参考官网算子

3.1 map【DataStream->DataStream】

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT())

mapped_stream.print()
env.execute("tutorial_job")

在这里插入图片描述

3.2 flat_map【DataStream->DataStream】

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

data_stream = env.from_collection(collection=['hello apache flink', 'streaming compute'])
out = data_stream.flat_map(lambda x: x.split(' '), output_type=Types.STRING())

out.print()
env.execute("tutorial_job")

在这里插入图片描述

3.3 filter【DataStream->DataStream】

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件


def my_func(value):
    if value % 2 == 0:
        return value
    
    
data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
filtered_stream = data_stream.filter(my_func)

filtered_stream.print()
env.execute("tutorial_job")

3.4 window_all【DataStream->AllWindowedStream】

根据某些特征(例如,最近 100毫秒秒内到达的数据)对所有流事件进行分组。
所有的元素。

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
all_window_stream = data_stream.window_all(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))

3.4.1 apply【AllWindowedStream->DataStream】

将通用 function 应用于整个窗口。

from typing import Iterable

from pyflink.common import Time
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import AllWindowFunction
from pyflink.datastream.window import TumblingProcessingTimeWindows, TimeWindow

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

class MyAllWindowFunction(AllWindowFunction[tuple, int, TimeWindow]):
    def apply(self, window: TimeWindow, inputs: Iterable[tuple]) -> Iterable[int]:
        sum = 0
        for input in inputs:
            sum += input[0]
        yield sum


data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
all_window_stream = data_stream.window_all(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))
out = all_window_stream.apply(MyAllWindowFunction())

out.print()
env.execute("tutorial_job")

3.5 key_by【DataStream->KeyedStream】

需要结合reduce或window算子使用。

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())

3.6 reduce【KeyedStream->DataStream】增量

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
out = key_stream.reduce(lambda a, b: (a[0]+b[0], a[1]))

out.print()
env.execute("tutorial_job")

在这里插入图片描述
在相同 key 的数据流上“滚动”执行 reduce。
将当前元素与最后一次 reduce 得到的值组合然后输出新值。

3.7 window【KeyedStream->WindowedStream】

在已经分区的 KeyedStreams 上定义 Window。

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
window_stream = key_stream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))

3.7.1 apply【WindowedStream->DataStream】

将通用 function 应用于整个窗口。

from typing import Iterable

from pyflink.common import Time, Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import WindowFunction
from pyflink.datastream.window import TumblingProcessingTimeWindows, TimeWindow

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

class MyWindowFunction(WindowFunction[tuple, int, int, TimeWindow]):
    def apply(self, key: int, window: TimeWindow, inputs: Iterable[tuple]) -> Iterable[int]:
        sum = 0
        for input in inputs:
            sum += input[0]
        yield key, sum


data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
window_stream = key_stream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(10)))
out = window_stream.apply(MyWindowFunction())

out.print()
env.execute("tutorial_job")

在这里插入图片描述

3.7.2 reduce【WindowedStream->DataStream】

from pyflink.common import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.window import TumblingEventTimeWindows,TumblingProcessingTimeWindows

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
window_stream = key_stream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(10)))
out = window_stream.reduce(lambda a, b: (a[0]+b[0], a[1]))

out.print()
env.execute("tutorial_job")

在这里插入图片描述
方式二

from pyflink.common import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, ReduceFunction
from pyflink.datastream.window import TumblingProcessingTimeWindows

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

class MyReduceFunction(ReduceFunction):
    def reduce(self, value1, value2):
        return value1[0] + value2[0], value1[1]

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
window_stream = key_stream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(10)))
out = window_stream.reduce(MyReduceFunction())

out.print()
env.execute("tutorial_job")

3.8 union【DataStream*->DataStream】

将两个或多个数据流联合来创建一个包含所有流中数据的新流。
注意:如果一个数据流和自身进行联合,这个流中的每个数据将在合并后的流中出现两次。

from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

data_stream1 = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
data_stream2 = env.from_collection(collection=[(1, 'a'), (3, 'b'), (2, 'a'), (4,'a')])
out = data_stream2.union(data_stream1)

out.print()
env.execute("tutorial_job")

在这里插入图片描述

3.9 connect【DataStream,DataStream->ConnectedStream】

stream_1 = ...
stream_2 = ...
connected_streams = stream_1.connect(stream_2)

3.9.1 CoMap【ConnectedStream->DataStream】

from pyflink.datastream import StreamExecutionEnvironment, CoMapFunction

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

data_stream1 = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
data_stream2 = env.from_collection(collection=[(1, 'a'), (3, 'b'), (2, 'a'), (4,'a')])
connected_stream = data_stream1.connect(data_stream2)


class MyCoMapFunction(CoMapFunction):

    def map1(self, value):
        return value[0] *100, value[1]

    def map2(self, value):
        return value[0], value[1] + 'flink'

out = connected_stream.map(MyCoMapFunction())

out.print()
env.execute("tutorial_job")

在这里插入图片描述

3.9.2 CoFlatMap【ConnectedStream->DataStream】

from pyflink.datastream import StreamExecutionEnvironment, CoFlatMapFunction

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

data_stream1 = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
data_stream2 = env.from_collection(collection=[(1, 'a'), (3, 'b'), (2, 'a'), (4,'a')])
connected_stream = data_stream1.connect(data_stream2)


class MyCoFlatMapFunction(CoFlatMapFunction):

    def flat_map1(self, value):
        for i in range(value[0]):
            yield value[0]*100

    def flat_map2(self, value):
        yield value[0] + 10

out = connected_stream.flat_map(MyCoFlatMapFunction())

out.print()
env.execute("tutorial_job")

在这里插入图片描述

4 对接kafka输入json输出json

输入{“name”:“中文”}
输出{“name”:“中文结果”}

from pyflink.common import SimpleStringSchema, WatermarkStrategy, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer, KafkaSink, \
    KafkaRecordSerializationSchema
import json

env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_parallelism(1)

brokers = "IP:9092"

# 读取kafka
source = KafkaSource.builder() \
    .set_bootstrap_servers(brokers) \
    .set_topics("flink_source") \
    .set_group_id("my-group") \
    .set_starting_offsets(KafkaOffsetsInitializer.latest()) \
    .set_value_only_deserializer(SimpleStringSchema()) \
    .build()

ds1 = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
ds1.print()

# 处理流程
def process_fun(line):
    data_dict = json.loads(line)
    result_dict = {"result": data_dict.get("name", "无")+"结果"}
    return json.dumps(result_dict, ensure_ascii=False)

ds2 = ds1.map(process_fun, Types.STRING())
ds2.print()
上一篇:c语言——字符函数与字符串函数-二 字符串函数


下一篇:本地启动ollama大语言模型