InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。
InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。
接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。
上一章介绍了一个SQL是怎样从字符串转换到物理执行计划的,详情见:
https://my.oschina.net/u/3374539/blog/5035628
这一章主要记录一下物理计划是怎样执行的。
在上一篇文章的末尾,我们展示了物理计划之中存储的数据,这些数据代表了当前整个数据库中,能够与用户输入的查询表相关联的所有数据。
对于一般数据库来讲,在物理计划中更应该是指向索引相关的信息,举例来说:select * from table1 ,在物理计划里,应该是要拿到table1的表描述、存储数据的文件路径、文件大小、等等,而不是拿到真实数据。在文章最末尾中,有一段省略的数据,为什么会出现数据呢?其实这是数据库设计的缓存,缓存的数据本来就没有落到磁盘上,所以直接在物理计划中也会持有RBChunk和MBChunk的数据引用。
对于一个过滤而言,会在物理计划中产生对应的信息,展示如下:
select * from myMeasurement where fieldKey like 'value1';
input: FilterExec { predicate: BinaryExpr { left: Column { name: "fieldKey" }, op: Like, right: Literal { value: Utf8("value1") } }
接下来看物理计划的执行代码:
pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
match plan.output_partitioning().partition_count() {
0 => Ok(vec![]),
//单一块的时候直接取出数据
1 => {
let it = plan.execute(0).await?;
common::collect(it).await
}
//多个数据块的时候就需要进行合并数据
_ => {
let plan = MergeExec::new(plan.clone());
assert_eq!(1, plan.output_partitioning().partition_count());
//这里分为了两步execute 和 collect
common::collect(plan.execute(0).await?).await
}
}
}
接下来看plan.execute方法:
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
。。。省略
tokio::spawn(async move {
//这里的input就代表了上面展示的filter的input或者是数据的input
let mut stream = match input.execute(part_i).await {
Err(e) => {
let arrow_error = ArrowError::ExternalError(Box::new(e));
sender.send(Err(arrow_error)).await.ok();
return;
}
Ok(stream) => stream,
};
//计划执行完成之后返回一个stream,这里就是一直next获取完
while let Some(item) = stream.next().await {
sender.send(item).await.ok();
}
});
。。。省略
}
上面的input代表了以下这么多东西:
上面展示的为datafusion框架里的Plan,也就是通用sql都需要实现的功能,下面是iox项目中实现的Plan是完成数据获取的。
Plan之间的关系是嵌套的,想象一下上一章的大图,比如coalesceBatchesExec里可能还会包含filter,主要就是描述整个sql语句中都出现了什么。所有出现的plan就会对数据进行一次全面的过滤。
姑且不看过滤的细节,只看获取数据的部分(ExecutionPlan for IOxReadFilterNode)。
async fn execute(
&self,
partition: usize,
) -> datafusion::error::Result<SendableRecordBatchStream> {
//因为在前面物理计划中得到了所有列,这里拿出列的名字
let fields = self.schema.fields();
let selection_cols = fields.iter().map(|f| f.name() as &str).collect::<Vec<_>>();
//多个分区的时候可以根据分区号拿出chunk信息
let ChunkInfo {
chunk,
chunk_table_schema,
} = &self.chunk_and_infos[partition];
//过滤出来列名字对应的arrow的filed,这里就存在不对应的问题,假如用户输入了ABC,但是chunk_table_schema中并不存在,这里就会是一个空
let selection_cols = restrict_selection(selection_cols, &chunk_table_schema);
let selection = Selection::Some(&selection_cols);
//使用predicate过滤一次,但是我调试的时候一直是空的,也就是查询出所有数据。
let stream = chunk
.read_filter(&self.table_name, &self.predicate, selection)
.map_err(|e| {
DataFusionError::Execution(format!(
"Error creating scan for table {} chunk {}: {}",
self.table_name,
chunk.id(),
e
))
})?;
//这里使用SchemaAdapterStream的结构来填充空值列
let adapter = SchemaAdapterStream::try_new(stream, Arc::clone(&self.schema))
.map_err(|e| DataFusionError::Internal(e.to_string()))?;
Ok(Box::pin(adapter))
}
这个SchemaAdapterStream在代码中给了一个特别形象的描述:
///
/// ┌────────────────┐ ┌─────────────────────────┐
/// │ ┌─────┐┌─────┐ │ │ ┌─────┐┌──────┐┌─────┐ │
/// │ │ A ││ C │ │ │ │ A ││ B ││ C │ │
/// │ │ - ││ - │ │ │ │ - ││ - ││ - │ │
/// ┌──────────────┐ │ │ 1 ││ 10 │ │ ┌──────────────┐ │ │ 1 ││ NULL ││ 10 │ │
/// │ Input │ │ │ 2 ││ 20 │ │ │ Adapter │ │ │ 2 ││ NULL ││ 20 │ │
/// │ Stream ├────▶ │ │ 3 ││ 30 │ │────▶│ Stream ├───▶│ │ 3 ││ NULL ││ 30 │ │
/// └──────────────┘ │ │ 4 ││ 40 │ │ └──────────────┘ │ │ 4 ││ NULL ││ 40 │ │
/// │ └─────┘└─────┘ │ │ └─────┘└──────┘└─────┘ │
/// │ │ │ │
/// │ Record Batch │ │ Record Batch │
/// └────────────────┘ └─────────────────────────┘
///
接下来看如何实现数据查找的:
fn read_filter(
&self,
table_name: &str,
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, Self::Error> {
//chunk存在变体,这里就是先判断是什么chunk,有三种MB,RB,ParquetFile
match self {
//还是在写入阶段的buffer,暂时不支持查询条件
Self::MutableBuffer { chunk, .. } => {
if !predicate.is_empty() {
return InternalPredicateNotSupported {
predicate: predicate.clone(),
}
.fail();
}
let batch = chunk
.read_filter(table_name, selection)
.context(MutableBufferChunk)?;
Ok(Box::pin(MemoryStream::new(vec![batch])))
}
//不可写阶段的buffer,对数据进行过滤
Self::ReadBuffer { chunk, .. } => {
let rb_predicate =
to_read_buffer_predicate(&predicate).context(PredicateConversion)?;
//读取数据并过滤
let read_results = chunk
.read_filter(table_name, rb_predicate, selection)
.context(ReadBufferChunkError {
chunk_id: chunk.id(),
})?;
//读取schema信息并过滤
let schema = chunk
.read_filter_table_schema(table_name, selection)
.context(ReadBufferChunkError {
chunk_id: chunk.id(),
})?;
//ReadFilterResultsStream是对不同的chunk类型实现的读取接口
Ok(Box::pin(ReadFilterResultsStream::new(
read_results,
schema.into(),
)))
}
//Parquet同理
Self::ParquetFile { chunk, .. } => chunk
.read_filter(table_name, predicate, selection)
.context(ParquetFileChunkError {
chunk_id: chunk.id(),
}),
}
}
数据到了这里就会按照你选择的表名、列名,将数据全部查询出来了。在代码中的predicate,一直是空的,暂时不确定是如何填充的,后面再看。
数据从这里全部查询出来之后,会返回给datafusion框架,继续按照开头写到的过滤器进行过滤,就是遍历一遍数据判断大于、小于或者like等等。
好了查询就先写到这里。
祝玩儿的开心!!
欢迎关注微信公众号:
或添加微信好友: liutaohua001