Confluent Platform: ksqlDB 实时流处理 (quick start)

文章目录

1, Confluent Platform介绍

  • Confluent Platform 是一个全面的事件流平台,使您能够以连续的实时流的形式轻松访问、存储和管理数据。Confluent 由 Apache Kafka® 的原始创建者构建,通过企业级功能扩展了 Kafka 的优势,同时消除了 Kafka 管理或监控的负担
  • 通过将历史和实时数据集成到一个单一的、中心的事实来源中,Confluent 可以轻松构建全新类别的现代事件驱动应用程序,获得通用数据管道,并解锁具有完全可扩展性的强大新用例、性能和可靠性
  • Confluent Platform 让您可以专注于如何从数据中获取业务价值,而不必担心底层机制,例如数据如何在不同的系统之间传输或集成。具体来说,Confluent Platform 简化了将数据源连接到 Kafka、构建流应用程序以及保护、监控和管理您的 Kafka 基础设施的过程
  • Confluent Platform 的核心是Apache Kafka,这是最流行的开源分布式流媒体平台

功能

  • 连接其他数据库系统:提供与 Kafka 的融合连接器,利用 Kafka Connect API 将 Kafka 连接到其他系统,例如数据库、键值存储、搜索索引和文件系统。
  • 可用于ETL和缓存视图数据(预先计算查询结果以加快数据读取): ksqlDB 组件是 Kafka 的流式 SQL 引擎,它为 Kafka 上的流处理提供了一个易于使用但功能强大的交互式 SQL 接口,而无需使用 Java 或 Python 等编程语言编写代码。ksqlDB 具有可扩展性、弹性、容错性和实时性。它支持广泛的流操作,包括数据过滤、转换、聚合、连接、窗口化和会话化。

2, 快速部署: quick start

Confluent Platform quickstart: https://docs.confluent.io/platform/current/quickstart/ce-quickstart.html

类别 命令
启动服务 confluent local services start
停止服务 confluent local services stop
删除服务和数据 confluent local destroy

a, 解压安装并启动服务

#1, 解压并配置环境变量
[root@c7-docker confluent-6.1.1]# export CONFLUENT_HOME=/opt/confluent-6.1.1/
[root@c7-docker confluent-6.1.1]# echo 'export CONFLUENT_HOME=/opt/confluent-6.1.1/' >>  /etc/profile
[root@c7-docker confluent-6.1.1]# export PATH=$PATH:$CONFLUENT_HOME/bin
[root@c7-docker confluent-6.1.1]# echo 'export PATH=$PATH:$CONFLUENT_HOME/bin' >> /etc/profile

#2, 安装kafka连接器 kafka-connect-datagen
#connector doc:	      https://docs.confluent.io/home/connect/overview.html
#插件安装目录: /opt/confluent-6.1.1/share/confluent-hub-components 
[root@c7-docker confluent-6.1.1]#  grep 'plugin.path' /opt/confluent-6.1.1/etc/ -r
/opt/confluent-6.1.1/etc/kafka/connect-standalone.properties:# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
/opt/confluent-6.1.1/etc/kafka/connect-standalone.properties:plugin.path=/usr/share/java,/opt/confluent-6.1.1/share/confluent-hub-components
/opt/confluent-6.1.1/etc/kafka/connect-distributed.properties:# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
/opt/confluent-6.1.1/etc/kafka/connect-distributed.properties:plugin.path=/usr/share/java,/opt/confluent-6.1.1/share/confluent-hub-components
/opt/confluent-6.1.1/etc/ksqldb/connect.properties:# plugin.path=
/opt/confluent-6.1.1/etc/schema-registry/connect-avro-standalone.properties:# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
/opt/confluent-6.1.1/etc/schema-registry/connect-avro-standalone.properties:plugin.path=share/java,/opt/confluent-6.1.1/share/confluent-hub-components
/opt/confluent-6.1.1/etc/schema-registry/connect-avro-distributed.properties:# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
/opt/confluent-6.1.1/etc/schema-registry/connect-avro-distributed.properties:plugin.path=share/java,/opt/confluent-6.1.1/share/confluent-hub-components
/opt/confluent-6.1.1/etc/kafka-connect-replicator/replicator-connect-distributed.properties:#plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors
/opt/confluent-6.1.1/etc/kafka-connect-replicator/replicator-connect-standalone.properties:#plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors

#本地编译安装: git clone https://github.com/confluentinc/kafka-connect-datagen.git
# git checkout v0.4.0
# mvn clean package
# confluent-hub install target/components/packages/confluentinc-kafka-connect-datagen-0.4.0.zip 
### Usage: confluent-hub install : install a component from either Confluent Hub or from a local file
[root@c7-docker confluent-6.1.1]# confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
Running in a "--no-prompt" mode
Implicit acceptance of the license below:
Apache License 2.0
https://www.apache.org/licenses/LICENSE-2.0
Downloading component Kafka Connect Datagen 0.5.0, provided by Confluent, Inc. from Confluent Hub and installing into /opt/confluent-6.1.1//share/confluent-hub-components
Adding installation directory to plugin path in the following files:
  /opt/confluent-6.1.1//etc/kafka/connect-distributed.properties
  /opt/confluent-6.1.1//etc/kafka/connect-standalone.properties
  /opt/confluent-6.1.1//etc/schema-registry/connect-avro-distributed.properties
  /opt/confluent-6.1.1//etc/schema-registry/connect-avro-standalone.properties
Completed

#3, 修改配置文件 ( 默认ksqlDB的连接地址为 localhost:8088, 防止远程连接 http://192.168.56.7:9021/ 查询的sql会报错)
[root@c7-docker confluent-6.1.1]# cd etc/
[root@c7-docker etc]# ls
cli                       confluent-control-center-fe  confluent-metadata-service  kafka                     ksqldb
confluent-common          confluent-hub-client         confluent-rebalancer        kafka-connect-replicator  rest-utils
confluent-control-center  confluent-kafka-mqtt         confluent-security          kafka-rest                schema-registry
[root@c7-docker etc]# grep ':8088' * -r
confluent-control-center/control-center.properties:#confluent.controlcenter.ksql.ksqlDB.url=http://localhost:8088
confluent-control-center/control-center.properties:confluent.controlcenter.ksql.ksqlDB.url=http://192.168.56.117:8088
confluent-control-center/control-center-minimal.properties:#confluent.controlcenter.ksql.ksqlDB.url=http://localhost:8088
confluent-control-center/control-center-minimal.properties:confluent.controlcenter.ksql.ksqlDB.url=http://192.168.56.117:8088
confluent-control-center/control-center-dev.properties:#confluent.controlcenter.ksql.ksqlDB.url=http://localhost:8088
confluent-control-center/control-center-dev.properties:confluent.controlcenter.ksql.ksqlDB.url=http://192.168.56.117:8088
confluent-control-center/control-center-production.properties:#confluent.controlcenter.ksql.ksqlDB.url=http://ksql:8088
confluent-control-center/control-center-production.properties:confluent.controlcenter.ksql.ksqlDB.url=http://192.168.56.117:8088
ksqldb/ksql-server.properties:#listeners=http://0.0.0.0:8088
ksqldb/ksql-server.properties:listeners=http://192.168.56.117:8088
ksqldb/ksql-server.properties:# listeners=http://[::]:8088
ksqldb/ksql-server.properties:# listeners=https://0.0.0.0:8088
ksqldb/ksql-production-server.properties:#listeners=http://0.0.0.0:8088
ksqldb/ksql-production-server.properties:listeners=http://192.168.56.117:8088
ksqldb/ksql-production-server.properties:# listeners=http://[::]:8088
ksqldb/ksql-production-server.properties:# listeners=https://0.0.0.0:8088

#4,启动服务并查看日志
[root@c7-docker etc]# confluent local services start
The local commands are intended for a single-node development environment only,
NOT for production usage. https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /tmp/confluent.007829
Starting ZooKeeper
ZooKeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Starting ksqlDB Server
ksqlDB Server is [UP]
Starting Control Center
Control Center is [UP]
[root@c7-docker etc]# ls /tmp/confluent.007829
connect  control-center  kafka  kafka-rest  ksql-server  schema-registry  zookeeper
#数据文件,日志文件: 
[root@c7-docker lib]# ls /tmp/confluent.007829/
connect  control-center  kafka  kafka-rest  ksql-server  schema-registry  zookeeper
[root@c7-docker lib]# ls /tmp/confluent.007829/connect/
connect.properties  connect.stdout  data  logs

b, Control Center页面化操作:创建topic并生成测试数据

  • 访问 http://xxx:9021 进行页面化操作
  • 创建topic: pageviews , users
  • 安装kafka 连接器 (kafka-connect-datagen) , 并生成测试数据
#a, 给 pageviews  topic生成测试数据 in AVRO format :  
       {
	  "name": "datagen-pageviews",
	  "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
	  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
	  "kafka.topic": "pageviews",
	  "max.interval": "100",
	  "quickstart": "pageviews"
	}



#b, 给 users  topic生成测试数据 in AVRO format :   
	{
	  "name": "datagen-users",
	  "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
	  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
	  "kafka.topic": "users",
	  "max.interval": "1000",
	  "quickstart": "users"
       }

c, 使用ksqlDB :查看数据/ 创建table或stream

ksql doc : https://docs.ksqldb.io/en/latest/concepts/streams/

  • 页面化操作ksqlDB 等同于命令行执行: ksql http://localhost:8088
######################### 使用ksqlDB创建table/stream : 
#1, 创建stream( 可以自动创建kafka topic)
# ksql> CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
#       WITH (kafka_topic='locations', value_format='json', partitions=1);

#参数说明:
#kafka_topic - 
#	Name of the Kafka topic underlying the stream. In this case it will be automatically created because it doesn't exist yet, but streams may also be created over topics that already exist.
#value_format - 
#	Encoding of the messages stored in the Kafka topic. For JSON encoding, each row will be stored as a JSON object whose keys/values are column names/values.
#partitions - 
#	Number of partitions to create for the locations topic. Note that this parameter is not needed for topics that already exist.

#2, 运行 push query over the stream: coordinates are within 5 miles 
# SELECT * FROM riderLocations WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
#   再打开新的会话,插入数据并观察上面是否实时查询数据
# INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
# INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);

# ksql>  create stream users( rowkey int key, username varchar) with( KAFKA_TOPIC='users',VALUE_FORMAT='JSON');
# Message
#----------------
# Stream created
#----------------
#ksql> insert into users(username) values('a');
#ksql> insert into users(username) values('b');
#ksql> select 'hello,'+ username as greeting from users emit changes;
#+----------------------------------------------------------------------------------------------------------------------------------------------------------+
#|GREETING                                                                                                                                                  |
#+----------------------------------------------------------------------------------------------------------------------------------------------------------+
#|hello,b                                                                                                                                                   |
#|hello,a                                                                                                                                                   |

# a stream for the pageviews topic : 
ksql> CREATE STREAM pageviews WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='AVRO');

# a table for the users topic:       
ksql> CREATE TABLE  users (id VARCHAR PRIMARY KEY) WITH (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO');

ksql> set 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql>  show topics;
 Kafka Topic                 | Partitions | Partition Replicas
---------------------------------------------------------------
 pageviews                   | 1          | 1
 users                       | 1          | 1
---------------------------------------------------------------

######################### 1,非持久化的查询:non-persistent query
ksql> SELECT * FROM pageviews EMIT CHANGES LIMIT 1;
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|VIEWTIME                                          |USERID                                            |PAGEID                                            |
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|129321                                            |User_1                                            |Page_40                                           |
Limit Reached
Query terminated


ksql> SELECT * from  users EMIT CHANGES LIMIT 1;
+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
|ID                           |REGISTERTIME                 |USERID                       |REGIONID                     |GENDER                       |
+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
|User_5                       |1489800608800                |User_5                       |Region_5                     |MALE                         |
Limit Reached
Query terminated

######################### 2,  持久化的查询:persistent query (as a stream) : 过滤 pageviews stream 中的女性用户,把查询结果保存到 pageviews_female topic 里面
ksql> SELECT users.id AS userid, pageid, regionid
  FROM pageviews LEFT JOIN users ON pageviews.userid = users.id
  WHERE gender = 'FEMALE'
  EMIT CHANGES LIMIT 1;
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|USERID                                            |PAGEID                                            |REGIONID                                          |
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|User_3                                            |Page_89                                           |Region_7                                          |
Limit Reached
Query terminated


ksql> CREATE STREAM pageviews_female AS 
  SELECT users.id AS userid, pageid, regionid
  FROM pageviews LEFT JOIN users ON pageviews.userid = users.id
  WHERE gender = 'FEMALE'
  EMIT CHANGES;

ksql> SELECT * from  pageviews_female EMIT CHANGES LIMIT 1;
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|USERID                                            |PAGEID                                            |REGIONID                                          |
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|User_8                                            |Page_97                                           |Region_4                                          |
Limit Reached
Query terminated

######################### 3,持久化的查询:persistent query : 过滤 regionid 以8或9结尾的,把查询结果保存到 pageviews_enriched_r8_r9 topic 里面
ksql>  SELECT * FROM pageviews_female
  WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'
  EMIT CHANGES LIMIT 1;
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|USERID                                            |PAGEID                                            |REGIONID                                          |
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|User_9                                            |Page_95                                           |Region_8                                          |
Limit Reached
Query terminated


ksql> CREATE STREAM pageviews_female_like_89
  WITH (KAFKA_TOPIC='pageviews_enriched_r8_r9', VALUE_FORMAT='AVRO')
  AS SELECT * FROM pageviews_female
  WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'
  EMIT CHANGES;

ksql> SELECT * from  pageviews_female_like_89 EMIT CHANGES LIMIT 1;
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|USERID                                            |PAGEID                                            |REGIONID                                          |
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|User_8                                            |Page_67                                           |Region_8                                          |
Limit Reached
Query terminated

######################### 4, 持久化的查询:persistent query : 统计 pageviews (stream) 里面 每个 REGIONID 和 GENDER ( 30s 为一个窗口 ) , 并且 count >1 , 把结果保存为 table ( topic 为 pageviews_regions ) 
ksql> SELECT gender, regionid, COUNT(*) AS numusers
  FROM pageviews LEFT JOIN users ON pageviews.userid = users.id
  WINDOW TUMBLING (SIZE 30 SECOND)
  GROUP BY gender, regionid
  HAVING COUNT(*) > 1 EMIT CHANGES LIMIT 1;
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|GENDER                                            |REGIONID                                          |NUMUSERS                                          |
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|FEMALE                                            |Region_6                                          |2                                                 |
Limit Reached
Query terminated


ksql> CREATE TABLE pageviews_regions WITH (KEY_FORMAT='JSON')
  AS SELECT gender, regionid, COUNT(*) AS numusers
  FROM pageviews LEFT JOIN users ON pageviews.userid = users.id
  WINDOW TUMBLING (SIZE 30 SECOND)
  GROUP BY gender, regionid
  HAVING COUNT(*) > 1
  EMIT CHANGES;

ksql> SELECT * from  pageviews_regions EMIT CHANGES LIMIT 1;
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|KSQL_COL_0                           |WINDOWSTART                          |WINDOWEND                            |NUMUSERS                             |
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|OTHER|+|Region_3                     |1623913830000                        |1623913860000                        |3                                    |
Limit Reached
Query terminated
上一篇:JDBC-day1


下一篇:java-如何将两个Kafka流结合在一起,并在具有Avro值的主题中产生结果