2021-04-12

Flink 程序Sink(数据输出)操作(5)自定义RabbitMq-Sink

自定义sink需要继承RichSinkFunction

ex:

public static class Demo extends RichSinkFunction<IN> {}

自定义RabbitMQ sink必要依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-rabbitmq_2.12</artifactId>
    <version>1.12.2</version>
</dependency>

如上依赖所示,其实已经是有rabbitMQ的连接器,但是,此连接器只能简单的Queue 模式,我们的业务需求可能不能直接使用Queue模式,比如需要发送到交换机中(Fanout、Driect)等。如果有这种场景呢,我们需要连接器的基础上再自定义RabbitMQ Sink了。

定义配置

# 计算结果输出RabbitMQ配置
sink.rabbitmq.host=10.50.40.116
sink.rabbitmq.port=5673
sink.rabbitmq.username=admin
sink.rabbitmq.password=admin
sink.rabbitmq.exchange=vehicle-alarm-over-speeding

配置对应实体类

我们一会使用代码,读取我们的sink配置为对象,作为参数不断传递

@NoArgsConstructor
@AllArgsConstructor
@Builder
@Data
public class RabbitMqSinkProperties implements Serializable {
    /**
     * rabbitMQ ip
     */
    private String host;
    /**
     * 端口
     */
    private int port;
    /**
     * 用户民
     */
    private String userName;
    /**
     * 密码
     */
    private String passWord;
    /**
     * 交换机名
     */
    private String exchange;


}

定义公共MQSink类继承RichSinkFunction

此类主要作用是作为一个RabbitMQ sink的中间模板,其中定义MQ sink 与RabbitMQ 的连接与关闭,交换机的类型指定等等。

我们某个具体的MQ sink 只需要继承此中间模板类,传输我们之前定义的配置对象,即可快速在invoke中完成对RabbitMQ数据的输出

package com.leilei.sink;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

/**
 * @author lei
 * @version 1.0
 * @desc mq 公共模板类 我们我们如果有多个mq-sink  只需继承此类 即可
 * @date 2021-03-15 16:41
 */
@Slf4j
public class DataRichSinkFunction<IN> extends RichSinkFunction<IN> {

    // 配置对象  后续我们在定义具体实体类时用子类触发父类构造调用
    protected final RabbitMqSinkProperties rabbitMQSinkProperties;


    protected Connection connection;

    protected Channel channel;

    public DataRichSinkFunction(RabbitMqSinkProperties rabbitMQSinkProperties) {
        this.rabbitMQSinkProperties = rabbitMQSinkProperties;
    }


    /**
     * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        // 设置RabbitMQ相关信息
        factory.setHost(rabbitMQSinkProperties.getHost());
        factory.setUsername(rabbitMQSinkProperties.getUserName());
        factory.setPassword(rabbitMQSinkProperties.getPassWord());
        factory.setPort(rabbitMQSinkProperties.getPort());

        // 创建一个新的连接
        connection = factory.newConnection();

        // 创建一个通道
        channel = connection.createChannel();

        // 声明交换机
        channel.exchangeDeclare(rabbitMQSinkProperties.getExchange(), BuiltinExchangeType.FANOUT, true);
    }

    /**
     * 关闭连接 flink程序从启动到销毁只会执行一次
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        super.close();
        channel.close();
        connection.close();
    }

}

根据业务定义sink类继承公共MQSink模板

由于公共MQSink模板类中已经对rabbitMq做了一个连接通道的开启和关闭,因此我们当前sink无需关系自身与Mq的连接与关闭,直接在invoke方法中,将数据输出到Mq即可

public class DemoSinkFunction extends DataRichSinkFunction<String> {

    public OverSpeedAlarmSinkFunction(RabbitMqSinkProperties rabbitMQSinkProperties) {
         /**
         * 调用父类(DataRichSinkFunction)构造,完成父类中属性填充
         */
        super(rabbitMQSinkProperties);
    }
    /**
     * 数据输出到 rabbitMQSinkProperties 指定的交换机中
     * @param value
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(String value, Context context) throws Exception {
        System.out.println(LocalDateTime.now() + "发送数据:" + value);
        channel.basicPublish(rabbitMQSinkProperties.getExchange(), "", null, value.getBytes(StandardCharsets.UTF_8));
    }
}

输出测试

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
//自定义数据源加载 
DataStreamSource<Location> source = env.addSource(new MyLocationSource());
//读取rabbitMq sink配置文件
String path = "RabbitMqSink.properties";
Props props = new Props(path);
RabbitMqSinkProperties sinkProperties = RabbitMqSinkProperties.builder()
    .host(props.getStr("sink.rabbitmq.host"))
    .port(props.getInt("sink.rabbitmq.port"))
    .userName(props.getStr("sink.rabbitmq.username"))
    .passWord(props.getStr("sink.rabbitmq.password"))
    .exchange(props.getStr("sink.rabbitmq.exchange"))
    .build();

// TODO  source进行数据处理 得到结果流
//将结果流用自定义的sink发送到rabbitmq
stream.addSink(new DemoSinkFunction(sinkProperties));

2021-04-12

2021-04-12

从控制台打印以及RabbitMQ-WEB页面看到,我们计算的结果,成功发送到了RabbitMQ的自定义交换机中, RabbitMQ SInk 示例完成!

上一篇:flume1.9安装


下一篇:任06_Flume案例_官方案例(配置)