如何在Java中实现高效的去重优先队列

背景

在Apahce IoTDB中,查询最后需要根据时间戳列做join,而这一步操作是通过一个带有自动去重功能的优先队列实现的。之前的实现中,我们采用了Java自带的TreeSet,但是发现了如下的问题之后,自己实现了一个高效的去重优先队列。

Java语言库中自带了PriorityQueue作为默认优先队列的实现,为了代码的重用,Java也采用泛型编程的方式实现了PriorityQueue。但是与其他语言不同的是,Java的泛型编程只适用于类对象,对其基础类型却不适用。比如对于long类型而言,Java自带的PriorityQueue<Long>必须将其先装箱成Long对象,再进行addpoll时也需要对应的拆箱操作。这种装箱与拆箱操作不仅浪费了cpu时间,也浪费了内存空间,一个long型整数占8字节内存,但是一个Long对象至少还要包含4字节的对象头。

当我们还需要在优先队列的基础上增加去重的功能,也就是不希望堆中出现重复元素,在Java语言库中我们只能找到TreeSet去实现这个功能,但是TreeSet是用红黑树实现的,其中为了保持树平衡以及数据的全序性的复杂旋转操作,其实我们是不需要的。与PriorityQueue一样,Java中的TreeSet同样以泛型编程的方式实现,存在装箱与拆箱的overhead。

具体实现

在Apache IoTDB中实现了一个TimeSelector的工具类,在server/src/main/java/org/apache/iotdb/db/utils/datastructure包下,具体实现方式其实就是内置了一个long[],因为时间戳都是long类型的,可以根据具体场景,将long[]替换成其他原始数据类型的数组。
percolateUppercolateDown中,加上了去重的检查,如果当前元素等于需要插入的元素,就直接return,不继续进行下面的操作,这样就达到了去重的效果。

Talk is easy, show me the code:

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.iotdb.db.utils.datastructure;

import java.util.Arrays;

public class TimeSelector {

  private static final int MIN_DEFAULT_CAPACITY = 8;

  private final boolean ascending;

  private long[] timeHeap;
  private int heapSize;
  private long lastTime;

  public TimeSelector(int defaultCapacity, boolean isAscending) {
    this.ascending = isAscending;
    timeHeap = new long[Math.max(defaultCapacity, MIN_DEFAULT_CAPACITY)];
    heapSize = 0;
    lastTime = Long.MIN_VALUE;
  }

  public boolean isEmpty() {
    while (heapSize != 0 && timeHeap[0] == lastTime) {
      timeHeap[0] = timeHeap[heapSize - 1];
      percolateDown(0, timeHeap[0]);
      --heapSize;
    }
    return heapSize == 0;
  }

  public void add(long time) {
    if (heapSize == 0) {
      timeHeap[0] = time;
    }
    if (percolateUp(heapSize, time)) {
      ++heapSize;
      checkExpansion();
    }
  }

  public long pollFirst() {
    long minTime = lastTime;

    while (minTime == lastTime) {
      minTime = timeHeap[0];

      timeHeap[0] = timeHeap[heapSize - 1];
      percolateDown(0, timeHeap[0]);
      --heapSize;
    }

    lastTime = minTime;
    return minTime;
  }

  private void checkExpansion() {
    if (heapSize == timeHeap.length) {
      timeHeap = Arrays.copyOf(timeHeap, timeHeap.length << 1);
    }
  }

  private boolean percolateUp(int index, long element) {
    if (index == 0) {
      return true;
    }

    int parentIndex = (index - 1) >>> 1;
    long parent = timeHeap[parentIndex];

    if (parent == element) {
      return false;
    } else if (ascending ? element < parent : parent < element) {
      timeHeap[index] = parent;
      timeHeap[parentIndex] = element;
      boolean isSuccessful = percolateUp(parentIndex, element);
      if (!isSuccessful) {
        timeHeap[index] = element;
        timeHeap[parentIndex] = parent;
      }
      return isSuccessful;
    } else { // ascending ? parent < element : element < parent
      timeHeap[index] = element;
      return true;
    }
  }

  private void percolateDown(int index, long element) {
    if (index == heapSize - 1) {
      return;
    }

    int childIndex = getSmallerChildIndex(index);

    if (childIndex != -1) {
      long child = timeHeap[childIndex];
      if (ascending ? child < element : element < child) {
        timeHeap[childIndex] = element;
        timeHeap[index] = child;
        percolateDown(childIndex, element);
      }
    }
  }

  /**
   * Calculates the children indexes for a given index and checks to see which one is smaller and
   * returns the index.
   *
   * @param index the given index
   * @return index of a smaller child or -1 if no children
   */
  private int getSmallerChildIndex(int index) {
    final int leftChildIndex = (index << 1) + 1;
    final int rightChildIndex = (index << 1) + 2;

    int smallerChildIndex;
    if (heapSize <= leftChildIndex) {
      smallerChildIndex = -1;
    } else if (heapSize <= rightChildIndex) {
      smallerChildIndex = leftChildIndex;
    } else {
      if (ascending) {
        smallerChildIndex =
            timeHeap[leftChildIndex] < timeHeap[rightChildIndex] ? leftChildIndex : rightChildIndex;
      } else {
        smallerChildIndex =
            timeHeap[leftChildIndex] < timeHeap[rightChildIndex] ? rightChildIndex : leftChildIndex;
      }
    }
    return smallerChildIndex;
  }

  @Override
  public String toString() {
    return Arrays.toString(this.timeHeap);
  }
}

实验对比

我们将PriorityQueue<Long>, TreeSet<Long>以及TimeSelector的性能进行对比,插入10,000,000个long型随机数,然后在取出所有数相加

实验代码

https://github.com/JackieTien97/TimeSelector

package org.apache.iotdb.experiment;

import java.util.PriorityQueue;
import java.util.Random;
import java.util.TreeSet;

public class Main {

    private static final int TOTAL_COUNT = 10_000_000;
    private static final long SEED = 123456;


    public static void main(String[] args) {
        // Prior version
        long startTime = System.nanoTime();
        Random random = new Random(SEED);
        PriorityQueue<Long> priorityQueue = new PriorityQueue<>(TOTAL_COUNT);
        for (int i = 0; i < TOTAL_COUNT; i++) {
            priorityQueue.add(random.nextLong());
        }
        long sum = 0;
        while (!priorityQueue.isEmpty()) {
            sum += priorityQueue.poll();
        }
        System.out.println("Sum is: " + sum);
        System.out.println("PriorityQueue cost " + (System.nanoTime() - startTime) / 1_000_000 + "ms.");

        // TreeSet version
        startTime = System.nanoTime();
        random = new Random(SEED);
        TreeSet<Long> treeSet = new TreeSet<>();
        for (int i = 0; i < TOTAL_COUNT; i++) {
            treeSet.add(random.nextLong());
        }
        sum = 0;
        while (!treeSet.isEmpty()) {
            sum += treeSet.pollFirst();
        }
        System.out.println("Sum is: " + sum);
        System.out.println("TreeSet cost " + (System.nanoTime() - startTime) / 1_000_000 + "ms.");

        // iotdb TimeSelector version
        startTime = System.nanoTime();
        random = new Random(SEED);
        TimeSelector selector = new TimeSelector(TOTAL_COUNT, true);
        for (int i = 0; i < TOTAL_COUNT; i++) {
            selector.add(random.nextLong());
        }
        sum = 0;
        while (!selector.isEmpty()) {
            sum += selector.pollFirst();
        }
        System.out.println("Sum is: " + sum);
        System.out.println("TimeSelector cost " + (System.nanoTime() - startTime) / 1_000_000 + "ms.");
    }
}

实验结果

PriorityQueue TreeSet TimeSelector
耗时(ms) 9,727 11,105 7,350

可以看到PriorityQueue<Long>TimeSelector慢了32%,TreeSet<Long>TimeSelector慢了51%.

上一篇:求排列的逆序数(分治)


下一篇:Linux模式设计--数据大小,对齐函数相关【转】