上篇文章已经成功集成了Trino和CarbonData,这篇文章主要是测试我们的功能和性能方面的一些测试,测试出CarbonData相对于社区其他的一些文件格式的优势以及验证集成性能和可靠性,包括是否能正确启动集群,正确识别Carbondata文件进行读取数据做count,聚合分析等
建表语句:
/*
* create table by user pay list
*/
CREATE TABLE IF NOT EXIST CARBONDATA_PAYMENT (
`gameId` int,
`unionId` int,
`orderId` STRING,
`userId` INT,
`createTime` bigint,
`payTime` bigint,
`status` int,
`money` int
) stored as carbondata
生成测试数据的Spark代码:
package org.apache.carbondata.trino
import org.apache.spark.SparkContext
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.SparkSession
import scala.util.Random
/**
* @Description Build Test Data Store AS CarbonData
* @Author chenzhengyu
* @Date 2021-09-10 20:37
*/
object TestDataFromCarbonData {
def main(args: Array[String]): Unit = {
val sc = new SparkContext();
val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
val carbon = SparkSession.builder().config(sc.getConf).getOrCreateCarbonSession("hdfs://localhost:9000/user/hive/warehouse")
//build random num
val randomGameId = new Random(5);
val randomMoney = new Random();
import spark.implicits._
//build data
val dfTemp = spark.sparkContext.parallelize(1 to 1000000000).map(x => (randomGameId.nextInt(5), x % 8, "20210920" + (x % 100), x % 5, System.currentTimeMillis() + x % 500L, System.currentTimeMillis() + x % 500L, randomGameId.nextInt(4), randomMoney.nextInt(1000))).toDF("gameId", "unionId", "orderId", "userId", "createTime", "payTime", "status", "money")
//create temp view
dfTemp.createOrReplaceTempView("temp")
//create table
carbon.sql("CREATE TABLE CARBONDATA_PAYMENT (`gameId` int, `unionId` int, `orderId` STRING, `userId` INT, `createTime` bigint, `payTime` bigint,`status` int,`money` int ) stored as carbondata")
//show tables
carbon.sql("show tables").show
//insert data and select
spark.sql("insert into table CARBONDATA_PAYMENT select * from temp")
spark.sql("select count(1) from CARBONDATA_PAYMENT").show()
sc.stop()
}
}
这一部分代码我们可以使用Spark-Shell或者使用打包工具打包成JAR,通过Standalone模式或者YARN模式进行执行,执行的流程如下:
Spark框架的核心是一个计算引擎,整体来说,它采用了标准 master-slave 的结构。
如上图所示,它展示了一个 Spark执行时的基本结构。图形中的Driver表示master,负责管理整个集群中的作业任务调度。图形中的Executor 则是 slave,负责实际执行任务
我一共插入了大概10亿条数据左右,平均每个分区插入2.5E条数据:
生成的文件在HIVE目录下,观察如下:
通过Trino的Client进行查询 SELCT COUNT(1) AS TOTAL FROM CARBONDATA_PAYMENT
我们一共生成了 10E条数据
接下来我们做聚合查询,通过查询。我们通过模拟数据一共5个用户id,然后统计他们的总充值金额进行查询。
SELECT USERID,SUM(MONEY) AS TOTAL FROM CARBONDATA_PAYMENT GROUP BY USERID
结果如下图
这个流程针上面我做一个简单的总结:首先我们通过Spark 集成 Carbondata成功后,去生成Carbondata文件,我们通过SparkSession 这个类去生成一个临时会话,然后通过Spark RDD 去模拟一些游戏玩家订单充值数据,然后注册为临时表Temp。通过在Spark上连接Hive元数据库创建Carondata表,名字为CARBONDATA_PAYMENT的表。然后通过INSERT语句把刚刚的生成的数据插入到Hive表上,这个过程大概在10分钟左右(受限于本机磁盘和CPU性能影响),后续我们通过观察即可发现Hadoop的HDFS上已经写入了对应的数据了。接着我们就可以关闭Spark了,开始启动Presto(Trino)的服务端和客户端,按照配置文件配置好Hive的MetaStore地址和Catlog,我们使用Trino客户端连接进去。连接进去后我们通过网页打开控制台进行观察SQL的执行情况。然后我们执行简单查询看看是否能正常工作,检测完毕后,我们执行聚合查询,模拟查询根据玩家ID的总充值金额。然后在不同数据量(1w->10w->100w->1000w->1e->10e)的查询下的执行情况。然后我们再模拟查询某个玩家订单号去查询相关的详情数据情况,并且最后对此进行总结。与此同时,我们模拟超出内存超出执行的情况,保护查询避免集群出现OOM的机制
受限于本机性能发挥,基于刚刚按照导师要求进行的大规模数据测试结论如下:
- 基本查询、聚合查询等查询基本无问题,证明新版本Presto(Trino)集成成功
- 在做Count数据操作时候非常的快,基本1-2s可以出到结果,针对复杂的条件Count,如果有索引列也很快能查询出来
- 通过观察HDFS文件路径,可以发现Index文件是单独存储的一份文件,这样做的目的是提升数据读取效率
- 针对维度的查询明细数据,例如根据订单号进行查询,我们可以发现在10E数据中秒级返回查询的明细数据,这个速度是十分快的
- 如果在执行时候,超过允许最大执行内存,则会集群有停止执行的机制,避免内存oom