背景
在Apahce IoTDB中,查询最后需要根据时间戳列做join,而这一步操作是通过一个带有自动去重功能的优先队列实现的。之前的实现中,我们采用了Java自带的TreeSet
,但是发现了如下的问题之后,自己实现了一个高效的去重优先队列。
Java语言库中自带了PriorityQueue
作为默认优先队列的实现,为了代码的重用,Java也采用泛型编程的方式实现了PriorityQueue
。但是与其他语言不同的是,Java的泛型编程只适用于类对象,对其基础类型却不适用。比如对于long
类型而言,Java自带的PriorityQueue<Long>
必须将其先装箱成Long
对象,再进行add
,poll
时也需要对应的拆箱操作。这种装箱与拆箱操作不仅浪费了cpu时间,也浪费了内存空间,一个long
型整数占8字节内存,但是一个Long
对象至少还要包含4字节的对象头。
当我们还需要在优先队列的基础上增加去重的功能,也就是不希望堆中出现重复元素,在Java语言库中我们只能找到TreeSet
去实现这个功能,但是TreeSet
是用红黑树实现的,其中为了保持树平衡以及数据的全序性的复杂旋转操作,其实我们是不需要的。与PriorityQueue
一样,Java中的TreeSet
同样以泛型编程的方式实现,存在装箱与拆箱的overhead。
具体实现
在Apache IoTDB中实现了一个TimeSelector
的工具类,在server/src/main/java/org/apache/iotdb/db/utils/datastructure
包下,具体实现方式其实就是内置了一个long[]
,因为时间戳都是long
类型的,可以根据具体场景,将long[]
替换成其他原始数据类型的数组。
在percolateUp
和percolateDown
中,加上了去重的检查,如果当前元素等于需要插入的元素,就直接return
,不继续进行下面的操作,这样就达到了去重的效果。
Talk is easy, show me the code:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.utils.datastructure;
import java.util.Arrays;
public class TimeSelector {
private static final int MIN_DEFAULT_CAPACITY = 8;
private final boolean ascending;
private long[] timeHeap;
private int heapSize;
private long lastTime;
public TimeSelector(int defaultCapacity, boolean isAscending) {
this.ascending = isAscending;
timeHeap = new long[Math.max(defaultCapacity, MIN_DEFAULT_CAPACITY)];
heapSize = 0;
lastTime = Long.MIN_VALUE;
}
public boolean isEmpty() {
while (heapSize != 0 && timeHeap[0] == lastTime) {
timeHeap[0] = timeHeap[heapSize - 1];
percolateDown(0, timeHeap[0]);
--heapSize;
}
return heapSize == 0;
}
public void add(long time) {
if (heapSize == 0) {
timeHeap[0] = time;
}
if (percolateUp(heapSize, time)) {
++heapSize;
checkExpansion();
}
}
public long pollFirst() {
long minTime = lastTime;
while (minTime == lastTime) {
minTime = timeHeap[0];
timeHeap[0] = timeHeap[heapSize - 1];
percolateDown(0, timeHeap[0]);
--heapSize;
}
lastTime = minTime;
return minTime;
}
private void checkExpansion() {
if (heapSize == timeHeap.length) {
timeHeap = Arrays.copyOf(timeHeap, timeHeap.length << 1);
}
}
private boolean percolateUp(int index, long element) {
if (index == 0) {
return true;
}
int parentIndex = (index - 1) >>> 1;
long parent = timeHeap[parentIndex];
if (parent == element) {
return false;
} else if (ascending ? element < parent : parent < element) {
timeHeap[index] = parent;
timeHeap[parentIndex] = element;
boolean isSuccessful = percolateUp(parentIndex, element);
if (!isSuccessful) {
timeHeap[index] = element;
timeHeap[parentIndex] = parent;
}
return isSuccessful;
} else { // ascending ? parent < element : element < parent
timeHeap[index] = element;
return true;
}
}
private void percolateDown(int index, long element) {
if (index == heapSize - 1) {
return;
}
int childIndex = getSmallerChildIndex(index);
if (childIndex != -1) {
long child = timeHeap[childIndex];
if (ascending ? child < element : element < child) {
timeHeap[childIndex] = element;
timeHeap[index] = child;
percolateDown(childIndex, element);
}
}
}
/**
* Calculates the children indexes for a given index and checks to see which one is smaller and
* returns the index.
*
* @param index the given index
* @return index of a smaller child or -1 if no children
*/
private int getSmallerChildIndex(int index) {
final int leftChildIndex = (index << 1) + 1;
final int rightChildIndex = (index << 1) + 2;
int smallerChildIndex;
if (heapSize <= leftChildIndex) {
smallerChildIndex = -1;
} else if (heapSize <= rightChildIndex) {
smallerChildIndex = leftChildIndex;
} else {
if (ascending) {
smallerChildIndex =
timeHeap[leftChildIndex] < timeHeap[rightChildIndex] ? leftChildIndex : rightChildIndex;
} else {
smallerChildIndex =
timeHeap[leftChildIndex] < timeHeap[rightChildIndex] ? rightChildIndex : leftChildIndex;
}
}
return smallerChildIndex;
}
@Override
public String toString() {
return Arrays.toString(this.timeHeap);
}
}
实验对比
我们将PriorityQueue<Long>
, TreeSet<Long>
以及TimeSelector
的性能进行对比,插入10,000,000个long
型随机数,然后在取出所有数相加
实验代码
https://github.com/JackieTien97/TimeSelector
package org.apache.iotdb.experiment;
import java.util.PriorityQueue;
import java.util.Random;
import java.util.TreeSet;
public class Main {
private static final int TOTAL_COUNT = 10_000_000;
private static final long SEED = 123456;
public static void main(String[] args) {
// Prior version
long startTime = System.nanoTime();
Random random = new Random(SEED);
PriorityQueue<Long> priorityQueue = new PriorityQueue<>(TOTAL_COUNT);
for (int i = 0; i < TOTAL_COUNT; i++) {
priorityQueue.add(random.nextLong());
}
long sum = 0;
while (!priorityQueue.isEmpty()) {
sum += priorityQueue.poll();
}
System.out.println("Sum is: " + sum);
System.out.println("PriorityQueue cost " + (System.nanoTime() - startTime) / 1_000_000 + "ms.");
// TreeSet version
startTime = System.nanoTime();
random = new Random(SEED);
TreeSet<Long> treeSet = new TreeSet<>();
for (int i = 0; i < TOTAL_COUNT; i++) {
treeSet.add(random.nextLong());
}
sum = 0;
while (!treeSet.isEmpty()) {
sum += treeSet.pollFirst();
}
System.out.println("Sum is: " + sum);
System.out.println("TreeSet cost " + (System.nanoTime() - startTime) / 1_000_000 + "ms.");
// iotdb TimeSelector version
startTime = System.nanoTime();
random = new Random(SEED);
TimeSelector selector = new TimeSelector(TOTAL_COUNT, true);
for (int i = 0; i < TOTAL_COUNT; i++) {
selector.add(random.nextLong());
}
sum = 0;
while (!selector.isEmpty()) {
sum += selector.pollFirst();
}
System.out.println("Sum is: " + sum);
System.out.println("TimeSelector cost " + (System.nanoTime() - startTime) / 1_000_000 + "ms.");
}
}
实验结果
PriorityQueue | TreeSet | TimeSelector | |
---|---|---|---|
耗时(ms) | 9,727 | 11,105 | 7,350 |
可以看到PriorityQueue<Long>
比TimeSelector
慢了32%,TreeSet<Long>
比TimeSelector
慢了51%.