flink实现读取kafka写入pulsar

@TOC

相关pom依赖

将本实例的flink程序相关的依赖加进来:

		<dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-flink</artifactId>
            <version>2.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.10.0</version>
        </dependency>

序列化反序列化

需要实现序列化反序列化去读取和写入,实现类如下:

import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;

public class ByteArraySchema extends AbstractDeserializationSchema<byte[]> implements SerializationSchema<byte[]> {

    @Override
    public byte[] deserialize(byte[] bytes) {
        return bytes;
    }

    @Override
    public byte[] serialize(byte[] bytes) {
        return bytes;
    }

}

实现

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;

import java.util.Properties;

public class KafkaToPulsarCommon {
    public static StreamExecutionEnvironment getEnv() {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 关闭日志
        env.getConfig().disableSysoutLogging();
        //确保一次语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.seconds(60)));
        // 设置checkpoint时间
        env.enableCheckpointing(300000, CheckpointingMode.EXACTLY_ONCE);
        // 指定checkpoint执行的超时时间
        env.getCheckpointConfig().setCheckpointTimeout(Time.seconds(300).toMilliseconds());
        // checkpoint完成之后最小等多久可以触发另一个checkpoint
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(Time.seconds(30).toMilliseconds());
        // cancel后保留checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        return env;
    }

    public static Properties kafkaProperties(String broker, String groupid) {

        Properties props = new Properties();
        props.setProperty("bootstrap.servers", broker);
        props.setProperty("flink.partition-discovery.interval-millis", String.valueOf(5 * 60 * 1000));
        props.setProperty("group.id", groupid);

        return props;
    }

    public static void main(String[] args) throws Exception {

        final ParameterTool params = ParameterTool.fromArgs(args);
        final String topic = params.get("topic", "push_single_task");
        final String slb = params.get("clusterAddr", "localhost:9092");
        final String groupid = params.get("groupid", "kafka2pulsar_test");
        final String serviceUrl = params.get("serviceUrl", "pulsar://localhost:6650");
        final String outputTopic = params.get("outputTopic", "persistent://test/ns1/river_test1");
        final int sinkP = params.getInt("sinkP", 1);
        final String offset = params.get("offset", "groupid");

        final StreamExecutionEnvironment env = getEnv();
        FlinkKafkaConsumer<byte[]> consumer = new FlinkKafkaConsumer<>(topic, new KafkaDeserializationSchemaWrapper<>(new ByteArraySchema()), kafkaProperties(slb, groupid));
        if ("earliest".equals(offset)) {
            consumer.setStartFromEarliest();
        }

        if ("latest".equals(offset)) {
            consumer.setStartFromLatest();
        }

        DataStream<byte[]> stream = env.addSource(consumer);

        ClientConfigurationData clientConf = new ClientConfigurationData();
        clientConf.setServiceUrl(serviceUrl);
        clientConf.setAuthentication(new AuthenticationDisabled());
        ProducerConfigurationData producerConf = new ProducerConfigurationData();
        producerConf.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
        producerConf.setTopicName(outputTopic);
        producerConf.setBlockIfQueueFull(true);

        stream.addSink(new FlinkPulsarProducer<>(
                clientConf,
                producerConf,
                new ByteArraySchema(),
                null,
                null
        )).setParallelism(sinkP);

        env.execute("kafka2pulsar");
    }
}
上一篇:vs c++ 文件的输入和输出


下一篇:Pulsar2.7.0最新版本安装