一.任务配置文件
使用类型为kafka
{
"type": "kafka",
"dataSchema": {
"dimensionsSpec": {... ...},
"transformSpec":{... ...},
"metricsSpec":{... ...}
},
"tuningConfig": {... ...},
"ioConfig": {... ...}
}
Ⅰ).数据源
数据源配置部分5部分,表信息、解析器、数据转换、指标度量和聚合&查询粒度
"dataSchema": {
"dataSource": "druid_table_name",
"parser": {},
"transformSpec": {},
"metricsSpec": {}
}
a).表信息
"dataSource": "druid_table_name"
b).解析器
解析器包括:解析器类型、聚合字段和非聚合字段
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"appName",
"nodeName"
],
"dimensionExclusions": []
}
}
c).数据转换
数据转换主要使用:时间转换、表达式、过滤等;其中,表达式可满足一些根据不同区间范围的指标统计类需求
"transformSpec": {
"transforms": [
{
"type": "expression",
"name": "time",
"expression": "timestamp_format(time,'yyyy-MM-dd HH:mm:ss.SSS')"
},
{
"type": "expression",
"name": "status",
"expression": "if(status, 1, 0)"
},
{
"type": "expression",
"name": "processRange1",
"expression": "if(processTime<=100, 1, 0)"
},
{
"type": "expression",
"name": "processRange2",
"expression": "if(processTime>100 && processTime<= 500, 1, 0)"
},
{
"type": "expression",
"name": "processRange3",
"expression": "if(processTime>500, 1, 0)"
}
]
},
d).指标度量
指标度量主要使用:Sum、Max、Min、hyperUnique
"metricsSpec": [
{
"name": "TotalTransCount",
"fieldName": "count",
"type": "longSum"
},
{
"name": "MaxProcessTime",
"fieldName": "processTime",
"type": "longMax"
},
{
"name": "MinProcessTime",
"fieldName": "processTime",
"type": "longMin"
},
{
"name": "TotalProcessRange1",
"fieldName": "processRange1",
"type": "longSum"
},
{
"name": "TotalProcessRange1",
"fieldName": "processRange2",
"type": "longSum"
},
{
"name": "TotalProcessRange1",
"fieldName": "processRange3",
"type": "longSum"
},
{
"name": "NodeName",
"fieldName": "nodeName",
"type": "hyperUnique"
}
]
e).聚合&查询粒度
聚合&查询粒度主要使用:all、none、second、minute、fifteen_minute、thirty_minute、hour、day、week、month、quarter、year
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "HOUR"
}
Ⅱ).任务及Segments配置
任务及Segments主要配置生产segment的大小,合并任务进程数
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 500000,
"workerThreads": 2,
"reportParseExceptions": false
},
Ⅲ).数据接入信息
数据接入信息主要包括:kafka consumer相关配置和任务执行间隔
"ioConfig": {
"topic": "kafka_topic_name",
"consumerProperties": {
"bootstrap.servers": "hostname:9092"
},
"useEarliestOffset": false,
"taskCount": 3,
"replicas": 1,
"taskDuration": "PT1H"
}
Ⅳ).后聚合配置
主要使用在查询时,根据业务场景需求,需要配置在度量指标基础上运算获得二级指标
"aggregations":[
{
"type":"count",
"name":"count"
}
]
二.提交任务
任务配置文件kafka-streaming.json,提交命令如下
curl -XPOST -H'Content-Type: application/json' -d @quickstart/tutorial/kafka-streaming.json http://hostname:8081/druid/indexer/v1/supervisor
三.常遇问题
1.offsetOutofRangeException
kafka的offset过期清理 {“auto.offset.reset” : “latest” };导致 auto.offset.reset:None -- “None”表示offset过期不作任何处理,只抛出异常,即offsetOutofRangeException
解决办法:
清空元信息库中druid_dataSource表, 表中记录了所有消费的Kafka topic对应的partition以及offset信息, 同时重启MiddleManager节
2.时差问题
默认:UTC,需要修改配置
a).各角色配置文件jvm.config
-Duser.timezone=UTC+0800
b).middleManager配置文件runtime.properties
# Task launch parameters
druid.indexer.runner.javaOpts=-server -Xmx8g -Duser.timezone=UTC+0800 -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
3.补跑数据
有时候任务失败,导致需要补录kafka数据,则修改数据接入中的配置信息
useEarliestOffset:true