创建Kafka结果表
Kafka结果表需要定义的DDL如下。
create table sink_kafka (
messageKey VARBINARY,
`message` VARBINARY,
PRIMARY KEY (messageKey)
) with (
type = 'kafka010',
topic = 'XXX',
`group.id` = 'XXX',
bootstrap.servers = 'ip1:port,ip2:port,ip3:port'
);
注意:
- 创建Kafka结果表时,必须显示的指定PRIMARY KEY (messageKey)。
- 无论是阿里云Kafka还是自建Kafka,目前实时计算计算均无Tps、Rps等指标信息。在作业上线之后,运维界面暂时不支持显示指标信息。
WITH参数
通用配置
参数 | 注释说明 | 备注 |
---|---|---|
type | kafka对应版本 | 必选,必须是 KAFKA08、KAFKA09、KAFKA010、KAFKA011中的一种,版本对应关系见表格下方。 |
topic | 写入的topic | topic名称 |
必选配置
kafka08必选配置:
参数 | 注释说明 | 备注 |
---|---|---|
group.id | N/A | 消费组id |
zookeeper.connect | zk链接地址 | zk连接id |
kafka09/kafka010/kafka011必选配置:
参数 | 注释说明 | 备注 |
---|---|---|
group.id | N/A | 消费组id |
bootstrap.servers | Kafka集群地址 | Kafka集群地址 |
Kafka集群地址:
如果您的Kkafka是阿里云商业版,参见kafka商业版准备配置文档。
如果您的Kafka是阿里云公测版,参见kafka公测版准备配置文档。
可选配置参数
"consumer.id","socket.timeout.ms","fetch.message.max.bytes","num.consumer.fetchers","auto.commit.enable","auto.commit.interval.ms","queued.max.message.chunks", "rebalance.max.retries","fetch.min.bytes","fetch.wait.max.ms","rebalance.backoff.ms","refresh.leader.backoff.ms","auto.offset.reset","consumer.timeout.ms","exclude.internal.topics","partition.assignment.strategy","client.id","zookeeper.session.timeout.ms","zookeeper.connection.timeout.ms","zookeeper.sync.time.ms","offsets.storage","offsets.channel.backoff.ms","offsets.channel.socket.timeout.ms","offsets.commit.max.retries","dual.commit.enabled","partition.assignment.strategy","socket.receive.buffer.bytes","fetch.min.bytes"
注意:
以上参数请用户根据实际业务需要选择使用。
其它可选配置项参考kafka官方文档进行配置:
Kafka09
https://kafka.apache.org/0110/documentation.html#consumerconfigs
Kafka010
https://kafka.apache.org/090/documentation.html#newconsumerconfigs
Kafka011
https://kafka.apache.org/0102/documentation.html#newconsumerconfigs
Kafka版本对应关系
type | Kafka 版本 |
---|---|
Kafka08 | 0.8.22 |
Kafka09 | 0.9.0.1 |
Kafka010 | 0.10.2.1 |
Kafka011 | 0.11.0.2 |
示例
create table datahub_input (
messageKey VARBINARY,
`message` VARBINARY
) with (type = 'random');
create table sink_kafka (
messageKey VARBINARY,
`message` VARBINARY,
PRIMARY KEY (messageKey)
) with (
type = 'kafka010',
topic = 'kafka_sink',
`group.id` = 'CID_kafka_sink',
bootstrap.servers = '192.168.116.8:9092,192.168.116.9:9092,192.168.116.10:9092'
);
INSERT INTO
sink_kafka
SELECT
*
FROM
datahub_input;
本文转自实时计算——创建Kafka结果表