The Algorithms logo
The Algorithms
AboutDonate

External Sort

p
#!/usr/bin/env python

#
# Sort large text files in a minimum amount of memory
#
import argparse
import os


class FileSplitter:
    BLOCK_FILENAME_FORMAT = "block_{0}.dat"

    def __init__(self, filename):
        self.filename = filename
        self.block_filenames = []

    def write_block(self, data, block_number):
        filename = self.BLOCK_FILENAME_FORMAT.format(block_number)
        with open(filename, "w") as file:
            file.write(data)
        self.block_filenames.append(filename)

    def get_block_filenames(self):
        return self.block_filenames

    def split(self, block_size, sort_key=None):
        i = 0
        with open(self.filename) as file:
            while True:
                lines = file.readlines(block_size)

                if lines == []:
                    break

                if sort_key is None:
                    lines.sort()
                else:
                    lines.sort(key=sort_key)

                self.write_block("".join(lines), i)
                i += 1

    def cleanup(self):
        map(os.remove, self.block_filenames)


class NWayMerge:
    def select(self, choices):
        min_index = -1
        min_str = None

        for i in range(len(choices)):
            if min_str is None or choices[i] < min_str:
                min_index = i

        return min_index


class FilesArray:
    def __init__(self, files):
        self.files = files
        self.empty = set()
        self.num_buffers = len(files)
        self.buffers = {i: None for i in range(self.num_buffers)}

    def get_dict(self):
        return {
            i: self.buffers[i] for i in range(self.num_buffers) if i not in self.empty
        }

    def refresh(self):
        for i in range(self.num_buffers):
            if self.buffers[i] is None and i not in self.empty:
                self.buffers[i] = self.files[i].readline()

                if self.buffers[i] == "":
                    self.empty.add(i)
                    self.files[i].close()

        if len(self.empty) == self.num_buffers:
            return False

        return True

    def unshift(self, index):
        value = self.buffers[index]
        self.buffers[index] = None

        return value


class FileMerger:
    def __init__(self, merge_strategy):
        self.merge_strategy = merge_strategy

    def merge(self, filenames, outfilename, buffer_size):
        buffers = FilesArray(self.get_file_handles(filenames, buffer_size))
        with open(outfilename, "w", buffer_size) as outfile:
            while buffers.refresh():
                min_index = self.merge_strategy.select(buffers.get_dict())
                outfile.write(buffers.unshift(min_index))

    def get_file_handles(self, filenames, buffer_size):
        files = {}

        for i in range(len(filenames)):
            files[i] = open(filenames[i], "r", buffer_size)  # noqa: UP015

        return files


class ExternalSort:
    def __init__(self, block_size):
        self.block_size = block_size

    def sort(self, filename, sort_key=None):
        num_blocks = self.get_number_blocks(filename, self.block_size)
        splitter = FileSplitter(filename)
        splitter.split(self.block_size, sort_key)

        merger = FileMerger(NWayMerge())
        buffer_size = self.block_size / (num_blocks + 1)
        merger.merge(splitter.get_block_filenames(), filename + ".out", buffer_size)

        splitter.cleanup()

    def get_number_blocks(self, filename, block_size):
        return (os.stat(filename).st_size / block_size) + 1


def parse_memory(string):
    if string[-1].lower() == "k":
        return int(string[:-1]) * 1024
    elif string[-1].lower() == "m":
        return int(string[:-1]) * 1024 * 1024
    elif string[-1].lower() == "g":
        return int(string[:-1]) * 1024 * 1024 * 1024
    else:
        return int(string)


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "-m", "--mem", help="amount of memory to use for sorting", default="100M"
    )
    parser.add_argument(
        "filename", metavar="<filename>", nargs=1, help="name of file to sort"
    )
    args = parser.parse_args()

    sorter = ExternalSort(parse_memory(args.mem))
    sorter.sort(args.filename[0])


if __name__ == "__main__":
    main()