6 min read

Golang Priority Queue: Sorting at Scale

How to find top N selling items out of billions without sorting all of them? 🤔

Greetings friend 👋

A Priority Queue (we'll implement it from scratch) has a few interesting properties which makes it indispensable for processing high volumes of data.

It's also known as Heap or Heap Queue and is often available in a language standard library. For example, Go defines a container/heap interface (pkg.go.dev/container/heap) and provides Heap operations on a conformant container.

Priority Queue is often unfairly forgotten though. We don't use it as much as other fundamental containers (arrays, lists and dicts), but it can be useful in constrained environments with high volume processing.

Properties

Recall, that a Priority Queue has the following properties:

  1. Constant time minimal element look up (it's always at the front of the queue)
  2. LogN time for adding and removing elements (heap is binary tree)

Sort Complexity

We're all familiar with sort. Even if you're rusty on the actual algorithm, you know that most common sort implementations are:

  1. QuickSort – requires no additional memory and runs in N*LogN
  2. MergeSort – requires N memory, runs in N*LogN as well

There are a few more, but this is what you'd see the most.

Now, tell me, what's your first instinct on how to get top 50 selling items out of a large unsorted list of them? 🙄🙋

100 giraffe spots
20  capybara toys
80  cat hugs
999 devops best practices
...

If you work on Unix, you'd likely instinctively write (and you'd be right!):

sort -n file.txt | tail -n 50

It's often convenient to write similar programming code as well.

However, sort does sort the entire list first and then you only select the last (or first for min) items.

Similarly, you can sort using a Priority Queue by first inserting all elements into a heap and then removing them one by one which results in the ascending order.

This results in the same time complexity: N*LogN since you're doing N insert/delete operations and the cost of each operation is logarithmic.

Have you spotted the trick already? 😉

We often forget that it's actually N*LogM complexity:

  1. N is the total number of elements sorted
  2. M is the elements in the heap

These are equal when you do a HeapSort. And you can make significant performance gains if you're only interested in the top M elements.

Here's a graph of what we expect to see, N*LogN vs N*LogM:

a line graph that shows a logarithmic Priority Queue time complexity approaching Full Sort baseline at top M approaching full queue size N

You can see a distinctive log shape. Now let's code this up and benchmark it!

Heap Implementation

We'll follow the most common and efficient Priority Queue implementation – array-packed binary tree.

Conveniently, as you can see in the diagram, indices work out such that the left child node is always in position 2*pos and the right child is at index 2*pos+1 for all nodes in the tree.

In this arrangement, the smallest element is always at the root of the tree and child nodes are always larger than their parent.

It remains then to implement addition and removal operations such that they preserve this invariant.

Note, for convenience and code clarity I'll waste the 0th element since packed positions start at 1, but you can also re-map them back into the 0-based positioning if needed.

Let's get our Go module started with:

mkdir go-heap && cd go-heap && go mod init codelab/heapq

And start an empty heap.go package file:

package heapq
...

Step 1: Container, Comparator and Go Generics

I'll allow any type of elements in the container even though we should technically enforce comparable and ordered.

However, the Ordered constraint didn't get into the standard library yet (it's still experimental), and I'm not a big fan of unnecessary dependencies in simple modules.

We can easily avoid this by taking a comparator function that tells us how to find a minimum of two elements.

type Heap[T any] struct { // accept Any type into the queue
	// compare two elements of type T and return true if first is smaller
	comparator Comparator[T]
	// number of elements in the priority queue
	size int
	// memory buffer of fixed capacity for storing the binary tree of items
	data []T
}

// return true is a < b
type Comparator[T any] func(a *T, b *T) bool

func NewHeap[T any](capacity int, comparator Comparator[T]) *Heap[T] {
	return &Heap[T]{
		comparator: comparator,
		size:       0,
		data:       make([]T, capacity+1, capacity+1),
	}
}

Step 2: Helper Functions

It would be useful to define a few helper functions to navigate tree nodes in an array:

func parentIdx(pos int) int {
	return pos / 2
}

func leftIdx(pos int) int {
	return pos * 2
}

func rightIdx(pos int) int {
	return pos*2 + 1
}

And a couple more helper methods to check whether we're at the tree leaf already and cannot proceed and a swap method for two nodes in the tree:

func (q *Heap[T]) isLeaf(pos int) bool {
	return leftIdx(pos) > q.size
}

func (q *Heap[T]) swap(a int, b int) {
	q.data[a], q.data[b] = q.data[b], q.data[a]
}

Step 3: Push / Peek / Pop

Now, Peek is the easy one, the minimal element is always at the root of the tree (element 1 in the array):

func (q *Heap[T]) Peek() (res T, err error) {
	if q.size < 1 {
		return res, fmt.Errorf("peeking into an empty queue")
	}

	res = q.data[1]
	return res, nil
}

Adding elements is easy – we add a new leaf at the bottom of the tree and keep "bubbling" it up until this element is smaller than its parent:

func (q *Heap[T]) Push(item T) error {
	if q.size >= len(q.data) {
		return fmt.Errorf("pushing into a full container")
	}

	q.size++
	cur := q.size

	q.data[cur] = item
	for q.comparator(&q.data[cur], &q.data[parentIdx(cur)]) {
		q.swap(cur, parentIdx(cur))
		cur = parentIdx(cur)
	}

	return nil
}

Removing is a little more difficult, we are taking the first element (root) out and need to find a good replacement in the children items below:

func (q *Heap[T]) Pop() (res T, err error) {
	if q.size < 1 {
		return res, fmt.Errorf("popping from an empty queue")
	}

	res = q.data[1]
	q.data[1] = q.data[q.size]
	q.size--
	q.percolate(1)

	return res, nil
}

func (q *Heap[T]) percolate(pos int) {
	if q.isLeaf(pos) {
		return
	}

	var cur *T = &q.data[pos]
	var left *T = &q.data[leftIdx(pos)]
	var right *T
	if rightIdx(pos) <= q.size {
		right = &q.data[rightIdx(pos)]
	}

	if q.comparator(left, cur) || q.comparator(right, cur) {
		if q.comparator(left, right) {
			q.swap(pos, leftIdx(pos))
			q.percolate(leftIdx(pos))
		} else {
			q.swap(pos, rightIdx(pos))
			q.percolate(rightIdx(pos))
		}
	}
}

Go Benchmark

Here's a simple benchmark to test the topM sort against the full size sort baseline:

func benchmarkTopM(topm int, maxsize int, b *testing.B) {
	var arr []int = make([]int, maxsize, maxsize)
	hq := NewHeap[int](topn, func(a, b *int) bool { return b == nil || a != nil && *a < *b })
	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		b.StopTimer()
		// random init
		for idx := 0; idx < len(arr); idx++ {
			arr[idx] = rand.Int()
		}
		b.StartTimer()
		for idx := 0; idx < len(arr); idx++ {
			if hq.size >= topm {
				hq.Pop()
			}
			hq.Push(arr[idx])
		}
		for idx := 0; idx < topn; idx++ {
			hq.Pop()
		}
	}
}

func TestTopRuntimes(t *testing.T) {
	file, err := os.Create("bench.csv")
	if err != nil {
		t.Fatal(err)
	}
	defer file.Close()

	fmt.Fprintln(file, "M size, heap topM, heap baseline")
	for topm := 1; topm < 1000; topm += 1 {
		fnHeap := func(b *testing.B) {
			benchmarkTopM(topn, 1000, b)
		}
		fnHeapBaseline := func(b *testing.B) {
			benchmarkTopM(1000, 1000, b)
		}
		rHeap := testing.Benchmark(fnHeap)
		rHeapBaseline := testing.Benchmark(fnHeapBaseline)

		fmt.Fprintln(file, topm, ",",
			int(rHeap.T)/rHeap.N, ",",
			int(rHeapBaseline.T)/rHeapBaseline.N)
	}
}

And here's the real life results on my Intel NUC which are a little noisy as expected and align with the model we saw at the start of this post:

What we're learnt

Hopefully, you see how useful a Priority Queue can be given the right use case :)

Let's recap:

  1. It's more cpu-time efficient than a sort | head because you only have an N*LogM complexity where M<N instead of full N*LogN of a regular sort.
  2. It's more memory efficient, you only ever need to store M items in memory and can even do online infinite data stream processing. And at any point, have the top M items ready.

That's it, thank you for reading!

You've been awesome! 🙌