创建Kafka结果表

创建Kafka结果表

Kafka结果表需要定义的DDL如下。


  1. create table sink_kafka (
  2. messageKey VARBINARY,
  3. `message` VARBINARY,
  4. PRIMARY KEY (messageKey)
  5. ) with (
  6. type = 'kafka010',
  7. topic = 'XXX',
  8. `group.id` = 'XXX',
  9. bootstrap.servers = 'ip1:port,ip2:port,ip3:port'
  10. );

注意:

  1. 创建Kafka结果表时,必须显示的指定PRIMARY KEY (messageKey)。
  2. 无论是阿里云Kafka还是自建Kafka,目前实时计算计算均无Tps、Rps等指标信息。在作业上线之后,运维界面暂时不支持显示指标信息。

WITH参数

通用配置

参数 注释说明 备注
type kafka对应版本 必选,必须是 KAFKA08、KAFKA09、KAFKA010、KAFKA011中的一种,版本对应关系见表格下方。
topic 写入的topic topic名称

必选配置

kafka08必选配置:

参数 注释说明 备注
group.id N/A 消费组id
zookeeper.connect zk链接地址 zk连接id

kafka09/kafka010/kafka011必选配置:

参数 注释说明 备注
group.id N/A 消费组id
bootstrap.servers Kafka集群地址 Kafka集群地址

Kafka集群地址:

如果您的Kkafka是阿里云商业版,参见kafka商业版准备配置文档

如果您的Kafka是阿里云公测版,参见kafka公测版准备配置文档

可选配置参数


  1. "consumer.id","socket.timeout.ms","fetch.message.max.bytes","num.consumer.fetchers","auto.commit.enable","auto.commit.interval.ms","queued.max.message.chunks", "rebalance.max.retries","fetch.min.bytes","fetch.wait.max.ms","rebalance.backoff.ms","refresh.leader.backoff.ms","auto.offset.reset","consumer.timeout.ms","exclude.internal.topics","partition.assignment.strategy","client.id","zookeeper.session.timeout.ms","zookeeper.connection.timeout.ms","zookeeper.sync.time.ms","offsets.storage","offsets.channel.backoff.ms","offsets.channel.socket.timeout.ms","offsets.commit.max.retries","dual.commit.enabled","partition.assignment.strategy","socket.receive.buffer.bytes","fetch.min.bytes"

注意:
以上参数请用户根据实际业务需要选择使用。
其它可选配置项参考kafka官方文档进行配置:
Kafka09
https://kafka.apache.org/0110/documentation.html#consumerconfigs
Kafka010
https://kafka.apache.org/090/documentation.html#newconsumerconfigs
Kafka011
https://kafka.apache.org/0102/documentation.html#newconsumerconfigs

Kafka版本对应关系

type Kafka 版本
Kafka08 0.8.22
Kafka09 0.9.0.1
Kafka010 0.10.2.1
Kafka011 0.11.0.2

示例


  1. create table datahub_input (
  2. messageKey VARBINARY,
  3. `message` VARBINARY
  4. ) with (type = 'random');
  5. create table sink_kafka (
  6. messageKey VARBINARY,
  7. `message` VARBINARY,
  8. PRIMARY KEY (messageKey)
  9. ) with (
  10. type = 'kafka010',
  11. topic = 'kafka_sink',
  12. `group.id` = 'CID_kafka_sink',
  13. bootstrap.servers = '192.168.116.8:9092,192.168.116.9:9092,192.168.116.10:9092'
  14. );
  15. INSERT INTO
  16. sink_kafka
  17. SELECT
  18. *
  19. FROM
  20. datahub_input;
本文转自实时计算——创建Kafka结果表
上一篇:Spring cloud 快速入门


下一篇:RabbitMQ安装