超大文件中在有限的内存里找到单词频率 top 100

问题

问题:有一个 1G 大小的文件,里面每一行是一个词,每个词的大小不超过 16 字节,内存限制大小是 1M。返回出现频率最高的 100 个单词

解法一:

第一步:将 1 G 拆分成小文件,每个小文件的大小不超过 1 MB ,其实应该留空间用于统计一个小文件中单词出现的频率,所以为了保险起见,每个文件的大小不超过 512 kb,那么得到 1 GB / 512 KB = 2048 个小文件

这里不用 hash 的方法来切分小文件,直接顺序读取然后写到小文件,小文件大小多到了 512kb 就关闭,写下一个小文件

第二步:分别对每个文件做下面的事情:
① 将小文件中的单词加载进内存
② 使用 HashMap 进行单词统计
③ 将 HashMap 中词频数据写到另一个新的小文件中,我们称为词频小文件
这里再写的时候,对单词进行 hash (word) % 2048 写到对应的文件中
这样做的目的是为了相同的单词放到同一个文件中。

第三步:初始化一个 100 个节点的小顶堆,用于保存 100 个出现频率最多的单词
分别对每个词频小文件做下面的事情:
① 将小文件中的单词及其频率加载进内存
② 使用 HashMap 进行单词统计
③ 遍历 HashMap 中单词词频,如果词频大于小顶堆中堆顶词频的话,则放入小顶堆,否则不放

最终,小顶堆中的 100 个单词就是出现频率最多的单词了

该方案在第二步的第 ③ 点的时候有点问题:就是当词频小文件大于 1 M 了,该怎么处理呢?
或者说极端情况下,每个单词都只出现一次,并且每个单词的 hash (word) % 2048 值都是相同的话,那词频小文件的大小都会超过 1 G 了

解法二:

第一步:使用多路归并排序对大文件进行排序,这样相同的单词肯定是挨着的

第二步:
① 初始化一个 100 个节点的小顶堆,用于保存 100 个出现频率最多的单词
② 遍历整个文件,一个单词一个单词的从文件中取出来,并计数
③ 等到遍历的单词和上一个单词不同的话,那么上一个单词及其频率如果大于堆顶的词的频率,那么放在堆中,否则不放

最终,小顶堆中就是出现频率前 100 的单词了

多路归并排序对大文件进行排序的步骤如下:
① 将文件按照顺序切分成大小不超过 512KB 的小文件,总共 2048 个小文件
② 使用 1MB 内存分别对 2048 个小文件中的单词进行排序
③ 使用一个大小为 2048 大小的堆,对 2048 个小文件进行多路排序,结果写到一个大文件中

代码实现

一:数据准备和工具类

package com.douma.practical.mult_line_merging;

import java.io.*;

/**
 * 抖码算法,让算法学习变的简单有趣
 *
 * @作者 : 老汤
 */
public class FileIOUtils {

    // 拿到指定文件的输入流,并包装成文件 BufferedReader
    public static BufferedReader getReader(String name) {
        try {
            FileInputStream inputStream = new FileInputStream(name);
            BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
            return br;
        } catch (IOException e) {
            throw new RuntimeException("IOException", e);
        }
    }

    // 拿到指定文件的输出流,并包装成文件 BufferedWriter
    public static BufferedWriter getWriter(String name) {
        try {
            FileOutputStream outputStream = new FileOutputStream(name);
            BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(outputStream));
            return bw;
        } catch (IOException e) {
            throw new RuntimeException("IOException", e);
        }
    }

    public static void closeReader(Reader reader) {
        try {
            if (reader != null) {
                reader.close();
            }
        } catch (IOException e) {
            throw new RuntimeException("IOException", e);
        }
    }

    public static void closeWriter(Writer writer) {
        try {
            if (writer != null) {
                writer.close();
            }
        } catch (IOException e) {
            throw new RuntimeException("IOException", e);
        }
    }
}


package com.douma.practical.mult_line_merging;

import java.io.*;
import java.util.Random;

/**
 * 准备阶段:准备 1G 的文件,文件中每一行一个单词,每个单词的大小小于等于 16 字节
 * 数据写道当前工程目录下的文件:data\top100\words.txt
 */
public class _0_WordsGenerator {
    private static Random r = new Random();

    public static void main(String[] args) throws IOException {

        BufferedWriter writer = FileIOUtils.getWriter("data\\top100\\words.txt");

        char[] chars = {'a', 'b', 'c', 'd', 'e', 'f', 'g'};
        int m = chars.length;

        for (int i = 0; i < 10000; i++) {
            StringBuilder line = new StringBuilder();
            for (int j = 0; j < r.nextInt(16); j++) {
                line.append(chars[r.nextInt(m)]);
            }
            if (line.length() == 0) continue;
            writer.write(line.toString());
            writer.newLine();
        }

        FileIOUtils.closeWriter(writer);
    }
}

二:将一个大文件切割成多个小文件

package com.douma.practical.mult_line_merging;

import java.io.*;

/**
 * 第一步:将一个大文件切割成多个小文件
 * 小文件的大小为 512kb 左右
 * 小文件都写到这个目录下:data\top100\raw_data\
 */
public class _1_FileSplit {

    public void splitFile(String fileName) throws IOException {
        int fileNum = 0;
        String fileSuffix = "data\\top100\\raw_data\\";
        String littleFileName = fileSuffix + fileNum;

        // 记录每个小文件的大小
        long totalSize = 0;

        BufferedWriter bw = FileIOUtils.getWriter(littleFileName);

        BufferedReader br = FileIOUtils.getReader(fileName);
        String line = null;
        while ((line = br.readLine()) != null) {
            // 如果当前小文件大小大于 512KB ,那么关闭当前小文件,写下一个小文件
            if (totalSize >= 512 * 1024) {
                // 关闭当前小文件的输出流
                FileIOUtils.closeWriter(bw);

                // 拿到下一个小文件的输出流,开始写下一个小文件
                fileNum++;
                littleFileName = fileSuffix + fileNum;
                bw = FileIOUtils.getWriter(littleFileName);
                totalSize = 0;
            }

            totalSize += line.length();

            bw.write(line);
            bw.newLine();
        }

        FileIOUtils.closeReader(br);
    }

    public static void main(String[] args) throws IOException {
        String fileName = "data\\top100\\words.txt";
        new _1_FileSplit().splitFile(fileName);
    }
}

三:将每个小文件中的单词进行排序

package com.douma.practical.mult_line_merging;

import java.io.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
 * 第二步:将每个小文件中的单词进行排序
 *  排序后的数据写到另一个目录:data\top100\sorted_data\
 */
public class _2_LittleFileSorter {

    public void sortEachFile(String dirName) throws IOException {
        File dir = new File(dirName);
        File[] littleFiles = dir.listFiles();

        // 排序每一个小文件中的单词
        for (int i = 0; i < littleFiles.length; i++) {
            // 1. 将当前小文件中单词读取到内存中
            BufferedReader br = FileIOUtils.getReader(littleFiles[i].getName());
            List<String> words = new ArrayList<>();
            String line = null;
            while ((line = br.readLine()) != null) {
                words.add(line);
            }

            FileIOUtils.closeReader(br);

            // 2. 对当前文件中的所有单词进行排序
            Collections.sort(words);

            // 3. 将排完序的单词写道目标文件中
            BufferedWriter bw = FileIOUtils.getWriter("data\\top100\\sorted_data\\" + i);
            for (String word : words) {
                bw.write(word);
                bw.newLine();
            }
            FileIOUtils.closeWriter(bw);
        }

    }

    public static void main(String[] args) throws IOException {
        String dir = "data\\top100\\raw_data";
        new _2_LittleFileSorter().sortEachFile(dir);
    }

}

四:对有序的小文件进行外部排序

我们先写一个 BufferedIterator 类,如下:

package com.douma.practical.mult_line_merging;

import java.io.BufferedReader;
import java.io.IOException;

// BufferedIterator 是对文件输入流 BufferedReader 的包装,用于:
//  ① 判断输入流中是否还有数据,有过有的话,则先缓存下一行数据
//  ② 得到文件输入流中的下一行数据
public class BufferedIterator {

    private BufferedReader reader;
    private String head;

    BufferedIterator(BufferedReader reader) {
        this.reader = reader;
    }

    public boolean hasNext() {
        try {
            head = this.reader.readLine();
        } catch (IOException e) {
            e.printStackTrace();
            head = null;
        }
        return head != null;
    }

    public String next() {
        return head;
    }

    public void close() throws Exception {
        this.reader.close();
    }
}

然后使用多路归并排序来实现文件的外部排序,如下代码:

package com.douma.practical.mult_line_merging;

import java.io.*;
import java.util.Comparator;
import java.util.PriorityQueue;

/**
 * 第三步: 对有序的小文件进行外部排序
 * 使用多路归并排序实现
 */
public class _3_ExternalSorter {

    public void mergeSort(String dirName) throws Exception {

        File dir = new File(dirName);
        File[] children = dir.listFiles();

        // 1. 初始化一个最小堆,大小就是有序小文件的个数
        // 堆中的每个节点存放每个有序小文件对应的输入流
        // BufferedIterator 是对文件输入流 BufferedReader 的包装,用于:
        //  ① 判断输入流中是否还有数据
        //  ② 得到文件输入流中的下一行数据
        PriorityQueue<BufferedIterator> minHeap = new PriorityQueue<>(children.length, new Comparator<BufferedIterator>() {
            @Override
            public int compare(BufferedIterator o1, BufferedIterator o2) {
                // 按照每个有序文件中的下一行数据对所有文件输入流进行排序
                // 单词小的输入文件流放在堆顶
                return o1.next().compareTo(o2.next());
            }
        });

        // 2. 将所有有序文件的输入流包装成 BufferedIterator 放入到堆中
        for (File file : children) {
            BufferedReader br = FileIOUtils.getReader(file.getName());
            BufferedIterator buf = new BufferedIterator(br);
            // 要有数据才放入到堆中,否则关闭流
            if (buf.hasNext()) {
                minHeap.add(buf);
            } else {
                buf.close();
            }
        }

        BufferedWriter bw = FileIOUtils.getWriter("data\\top100\\sorted_words.txt");
        // 3. 拿出堆顶的数据,并写入到最终排序的文件中
        while (!minHeap.isEmpty()) {
            BufferedIterator firstBuf = minHeap.poll();
            bw.write(firstBuf.next());
            bw.newLine();
            // 如果拿出来的输入流中还有数据的话,那么将这个输入流再一次添加到栈中,
            if (firstBuf.hasNext()) {
                minHeap.add(firstBuf);
            } else { // 否则说明该文件输入流中没有数据了,那么可以关闭这个流
                firstBuf.close();
            }
        }

       FileIOUtils.closeWriter(bw);
    }

    public static void main(String[] args) throws Exception {
        String dirName = "data\\top100\\sorted_data\\";
        new _3_ExternalSorter().mergeSort(dirName);
    }

}

五:利用 100 大小的小顶堆得到出现频率 top 100 的单词

package com.douma.practical.mult_line_merging;

import java.io.BufferedReader;
import java.util.Arrays;
import java.util.Comparator;
import java.util.PriorityQueue;

/**
 * 第四步:利用 100 大小的小顶堆得到出现频率 top 100 的单词
 * 到这里,经过外部排序后,文件中所有的单词都是有序的,相同的单词都是挨着的
 * 所以,可以顺序遍历有序的文件,得到出现频率 top 100 的单词
 */
public class _4_Top_100_Words {

    class Pair {
        String word;
        int cnt;

        Pair(String word, int cnt) {
            this.word = word;
            this.cnt = cnt;
        }
    }

    public String[] top_100 (String fileName) throws Exception {

        // 1. 初始化一个 100 大小的小顶堆
        // 堆中每个节点存放的是单词及其出现的次数
        PriorityQueue<Pair> minHeap = new PriorityQueue<>(100, new Comparator<Pair>() {
            @Override
            public int compare(Pair o1, Pair o2) {
                // 按照单词出现的次数排序
                // 出现次数多的,放在堆顶
                return o1.cnt - o2.cnt;
            }
        });


        // 前一个单词
        String prevWord = null;
        // 前一个单词出现的次数
        int prevCnt = 0;

        BufferedReader br = FileIOUtils.getReader(fileName);
        // 当前的单词
        String currWord = null;
        // 2. 读取有序文件中每一行的单词,然后统计其出现的次数,并保留出现次数 top 100 的单词
        while ((currWord = br.readLine()) != null) {
            // 如果当前的单词和前一个单词不是同一个单词
            // 那么需要处理前一个单词
            if (!currWord.equals(prevWord)) {
                // 如果堆的大小小于 100,那么直接将前一个单词及其出现的次数放入堆中
                if (minHeap.size() < 100) {
                    minHeap.add(new Pair(prevWord, prevCnt));
                } else if (prevCnt > minHeap.peek().cnt) { // 否则,如果前一个单词出现的次数大于堆顶单词出现的次数
                    // 先删除堆顶单词
                    minHeap.remove();
                    //将前一个单词及其出现的次数放入堆中
                    minHeap.add(new Pair(prevWord, prevCnt));
                }

                // 将当前单词设置为前一个单词
                prevWord = currWord;
                // 前一个单词出现的次数清零
                prevCnt = 0;
            }
            // 统计前一个单词出现的次数
            prevCnt++;
        }

        //3. 最终堆中的 100 个单词就是出现频率最高的单词了,输出即可
        String[] res = new String[100];
        int index = 0;
        while (!minHeap.isEmpty()) {
            res[index++] = minHeap.poll().word;
        }

        return res;
    }

    public static void main(String[] args) throws Exception {
        String fileName = "data\\top100\\sorted_words.txt";
        String[] res = new _4_Top_100_Words().top_100(fileName);
        System.out.println(Arrays.toString(res));
    }

}

上一篇:Connection strings in ADO.NET


下一篇:基于SpringBoot注解实现策略模式