@TOC
相关pom依赖
将本实例的flink程序相关的依赖加进来:
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-flink</artifactId>
<version>2.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.10.0</version>
</dependency>
序列化反序列化
需要实现序列化反序列化去读取和写入,实现类如下:
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
public class ByteArraySchema extends AbstractDeserializationSchema<byte[]> implements SerializationSchema<byte[]> {
@Override
public byte[] deserialize(byte[] bytes) {
return bytes;
}
@Override
public byte[] serialize(byte[] bytes) {
return bytes;
}
}
实现
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import java.util.Properties;
public class KafkaToPulsarCommon {
public static StreamExecutionEnvironment getEnv() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 关闭日志
env.getConfig().disableSysoutLogging();
//确保一次语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.seconds(60)));
// 设置checkpoint时间
env.enableCheckpointing(300000, CheckpointingMode.EXACTLY_ONCE);
// 指定checkpoint执行的超时时间
env.getCheckpointConfig().setCheckpointTimeout(Time.seconds(300).toMilliseconds());
// checkpoint完成之后最小等多久可以触发另一个checkpoint
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(Time.seconds(30).toMilliseconds());
// cancel后保留checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
return env;
}
public static Properties kafkaProperties(String broker, String groupid) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", broker);
props.setProperty("flink.partition-discovery.interval-millis", String.valueOf(5 * 60 * 1000));
props.setProperty("group.id", groupid);
return props;
}
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final String topic = params.get("topic", "push_single_task");
final String slb = params.get("clusterAddr", "localhost:9092");
final String groupid = params.get("groupid", "kafka2pulsar_test");
final String serviceUrl = params.get("serviceUrl", "pulsar://localhost:6650");
final String outputTopic = params.get("outputTopic", "persistent://test/ns1/river_test1");
final int sinkP = params.getInt("sinkP", 1);
final String offset = params.get("offset", "groupid");
final StreamExecutionEnvironment env = getEnv();
FlinkKafkaConsumer<byte[]> consumer = new FlinkKafkaConsumer<>(topic, new KafkaDeserializationSchemaWrapper<>(new ByteArraySchema()), kafkaProperties(slb, groupid));
if ("earliest".equals(offset)) {
consumer.setStartFromEarliest();
}
if ("latest".equals(offset)) {
consumer.setStartFromLatest();
}
DataStream<byte[]> stream = env.addSource(consumer);
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl(serviceUrl);
clientConf.setAuthentication(new AuthenticationDisabled());
ProducerConfigurationData producerConf = new ProducerConfigurationData();
producerConf.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
producerConf.setTopicName(outputTopic);
producerConf.setBlockIfQueueFull(true);
stream.addSink(new FlinkPulsarProducer<>(
clientConf,
producerConf,
new ByteArraySchema(),
null,
null
)).setParallelism(sinkP);
env.execute("kafka2pulsar");
}
}