Python Priority Queues with heapq

Table of Contents

1. Introduction to Priority Queues

Priority queues are abstract data structures that maintain elements with associated priorities and allow efficient access to the element with the highest (or lowest) priority. Python's heapq module implements a min heap, where the smallest element always has the highest priority.

2. Setting Up Our Environment

Let's start by importing the necessary modules and setting up our workspace.

import heapq
from typing import Any, List, Tuple
import itertools
from dataclasses import dataclass, field

print("Environment ready!")

3. Basic Heap Operations

3.1. Creating and Adding Elements

A heap in Python is simply a list that maintains the heap property: for any index i, the children of element i are at indices 2i + 1 and 2i + 2.

# Initialize an empty heap
heap = []

# Add elements using heappush
heapq.heappush(heap, 5)
heapq.heappush(heap, 3)
heapq.heappush(heap, 7)
heapq.heappush(heap, 1)

print("Heap after pushing elements:", heap)
print("Smallest element (heap[0]):", heap[0])

3.2. Removing Elements

The heappop operation removes and returns the smallest element while maintaining the heap property.

# Pop elements one by one
smallest = heapq.heappop(heap)
print("Popped smallest element:", smallest)
print("Heap after pop:", heap)

3.3. Heap Sort Implementation

We can implement a simple sorting algorithm using heap operations.

def heapsort(iterable: List[Any]) -> List[Any]:
    h = []
    # Push all elements onto heap
    for value in iterable:
        heapq.heappush(h, value)
    # Pop elements in order
    return [heapq.heappop(h) for _ in range(len(h))]

# Test the heapsort function
numbers = [5, 2, 8, 1, 9, 3, 7, 4, 6]
sorted_numbers = heapsort(numbers)
print("Original numbers:", numbers)
print("Sorted numbers:", sorted_numbers)

4. Advanced Priority Queue Implementation

4.1. Task Priority Queue with Entry Tracking

Let's implement a more sophisticated priority queue that can handle task updates and removals.

class PriorityQueue:
    def __init__(self):
        self.pq = []  # list of entries arranged in a heap
        self.entry_finder = {}  # mapping of tasks to entries
        self.REMOVED = '<removed-task>'  # placeholder for removed tasks
        self.counter = itertools.count()  # unique sequence count

    def add_task(self, task: str, priority: int = 0) -> None:
        """Add a new task or update the priority of an existing task."""
        if task in self.entry_finder:
            self.remove_task(task)
        count = next(self.counter)
        entry = [priority, count, task]
        self.entry_finder[task] = entry
        heapq.heappush(self.pq, entry)

    def remove_task(self, task: str) -> None:
        """Mark an existing task as REMOVED."""
        entry = self.entry_finder.pop(task)
        entry[-1] = self.REMOVED

    def pop_task(self) -> str:
        """Remove and return the lowest priority task."""
        while self.pq:
            priority, count, task = heapq.heappop(self.pq)
            if task is not self.REMOVED:
                del self.entry_finder[task]
                return task
        raise KeyError('pop from an empty priority queue')

# Test the PriorityQueue
pq = PriorityQueue()
pq.add_task("write code", 5)
pq.add_task("release product", 7)
pq.add_task("write spec", 1)
pq.add_task("create tests", 3)

print("\nProcessing tasks in priority order:")
try:
    while True:
        task = pq.pop_task()
        print(f"Processing task: {task}")
except KeyError:
    print("All tasks completed!")

5. Using Dataclasses for Priority Items

A cleaner approach using Python's dataclasses for priority queue entries.

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any = field(compare=False)

# Example usage
def process_items_with_dataclass():
    # Create a heap of PrioritizedItem objects
    items = []
    heapq.heappush(items, PrioritizedItem(5, "write code"))
    heapq.heappush(items, PrioritizedItem(7, "release product"))
    heapq.heappush(items, PrioritizedItem(1, "write spec"))
    heapq.heappush(items, PrioritizedItem(3, "create tests"))

    print("\nProcessing items using dataclass:")
    while items:
        item = heapq.heappop(items)
        print(f"Processing: {item.item} (priority: {item.priority})")

process_items_with_dataclass()

6. Practical Exercise: Event Scheduler

Implement a simple event scheduler using heapq.

from time import time

class Event:
    def __init__(self, time: float, callback: str):
        self.time = time
        self.callback = callback

    def __lt__(self, other: 'Event') -> bool:
        return self.time < other.time

class Scheduler:
    def __init__(self):
        self.events = []

    def schedule(self, delay: float, callback: str):
        """Schedule an event to occur after 'delay' seconds."""
        event_time = time() + delay
        heapq.heappush(self.events, Event(event_time, callback))

    def run(self):
        """Run all events that are due."""
        current_time = time()
        while self.events and self.events[0].time <= current_time:
            event = heapq.heappop(self.events)
            print(f"Executing: {event.callback} at time {event.time}")

# Test the scheduler
scheduler = Scheduler()
scheduler.schedule(1, "Send email")
scheduler.schedule(0.5, "Update database")
scheduler.schedule(2, "Generate report")

print("\nScheduled events will execute in order of their scheduled time")
scheduler.run()

7. Conclusion

The heapq module provides efficient implementations of priority queue operations. Key points to remember:

  1. Heaps maintain the heap property where parent nodes are always smaller than their children
  2. Perfect for implementing priority queues and schedulers
  3. Operations have O(log n) complexity for push and pop
  4. Can be used with custom objects by implementing comparison methods

For more advanced usage, consider:

  • Using dataclasses for cleaner priority item implementation
  • Implementing custom comparison methods for complex objects
  • Adding error handling for edge cases
  • Using type hints for better code maintainability

8. References

  • Python Standard Library Documentation: heapq module
  • Python Dataclasses Documentation
  • Time Complexity Analysis of Heap Operations
print("\nTutorial completed successfully!")

9. Guile Scheme Implementation

Implementing a binary heap-based priority queue in Guile Scheme.

(define-module (priority-queue)
  #:export (make-priority-queue
            pq-insert!
            pq-extract-min!
            pq-peek
            pq-empty?))

;; Helper functions for heap operations
(define (parent i)
  (quotient (- i 1) 2))

(define (left i)
  (+ (* 2 i) 1))

(define (right i)
  (+ (* 2 i) 2))

;; Priority Queue implementation
(define (make-priority-queue)
  (vector))

(define (pq-empty? pq)
  (zero? (vector-length pq)))

(define (pq-peek pq)
  (if (pq-empty? pq)
      (error "Priority queue is empty")
      (vector-ref pq 0)))

(define (swap! pq i j)
  (let ((temp (vector-ref pq i)))
    (vector-set! pq i (vector-ref pq j))
    (vector-set! pq j temp)))

(define (sift-up! pq idx)
  (let ((parent-idx (parent idx)))
    (when (and (>= parent-idx 0)
               (> (vector-ref pq parent-idx)
                  (vector-ref pq idx)))
      (swap! pq idx parent-idx)
      (sift-up! pq parent-idx))))

(define (sift-down! pq len idx)
  (let* ((left-idx (left idx))
         (right-idx (right idx))
         (smallest idx))
    (when (and (< left-idx len)
               (< (vector-ref pq left-idx)
                  (vector-ref pq smallest)))
      (set! smallest left-idx))
    (when (and (< right-idx len)
               (< (vector-ref pq right-idx)
                  (vector-ref pq smallest)))
      (set! smallest right-idx))
    (unless (= smallest idx)
      (swap! pq idx smallest)
      (sift-down! pq len smallest))))

(define (pq-insert! pq item)
  (let* ((len (vector-length pq))
         (new-pq (make-vector (+ len 1))))
    (let loop ((i 0))
      (when (< i len)
        (vector-set! new-pq i (vector-ref pq i))
        (loop (+ i 1))))
    (vector-set! new-pq len item)
    (sift-up! new-pq len)
    new-pq))

(define (pq-extract-min! pq)
  (if (pq-empty? pq)
      (error "Priority queue is empty")
      (let* ((len (vector-length pq))
             (min-val (vector-ref pq 0))
             (new-len (- len 1))
             (new-pq (make-vector new-len)))
        (vector-set! pq 0 (vector-ref pq new-len))
        (let loop ((i 0))
          (when (< i new-len)
            (vector-set! new-pq i (vector-ref pq i))
            (loop (+ i 1))))
        (when (> new-len 0)
          (sift-down! new-pq new-len 0))
        (values min-val new-pq))))

;; Example usage
(define (test-priority-queue)
  (let ((pq (make-priority-queue)))
    (set! pq (pq-insert! pq 5))
    (set! pq (pq-insert! pq 3))
    (set! pq (pq-insert! pq 7))
    (set! pq (pq-insert! pq 1))
    (display "Priority Queue: ")
    (display pq)
    (newline)
    (let-values (((min-val new-pq) (pq-extract-min! pq)))
      (set! pq new-pq)
      (display "Extracted minimum: ")
      (display min-val)
      (newline)
      (display "Updated Queue: ")
      (display pq)
      (newline))))

10. Clojure Implementation

Implementing a priority queue in Clojure using a binary heap.

(ns priority-queue.core
  (:refer-clojure :exclude [peek]))

(defprotocol PriorityQueue
  (insert [this item])
  (peek [this])
  (extract-min [this])
  (empty? [this]))

(deftype BinaryHeap [heap]
  PriorityQueue
  (empty? [_]
    (zero? (count heap)))
  
  (peek [_]
    (when (empty? heap)
      (throw (IllegalStateException. "Priority queue is empty")))
    (first heap))
  
  (insert [_ item]
    (letfn [(sift-up [h]
              (loop [idx (dec (count h))]
                (let [parent-idx (quot (dec idx) 2)]
                  (if (and (pos? idx)
                          (< (nth h idx) (nth h parent-idx)))
                    (recur parent-idx
                           (assoc h
                                 parent-idx (nth h idx)
                                 idx (nth h parent-idx)))
                    h))))]
      (BinaryHeap. (sift-up (conj heap item)))))
  
  (extract-min [this]
    (when (empty? heap)
      (throw (IllegalStateException. "Priority queue is empty")))
    (letfn [(sift-down [h]
              (loop [idx 0]
                (let [len (count h)
                      left-idx (inc (* 2 idx))
                      right-idx (+ 2 (* 2 idx))
                      smallest (cond
                               (and (< left-idx len)
                                    (< (nth h left-idx) (nth h idx)))
                               left-idx
                               
                               :else idx)]
                  (let [smallest (cond
                                 (and (< right-idx len)
                                      (< (nth h right-idx) (nth h smallest)))
                                 right-idx
                                 
                                 :else smallest)]
                    (if (= smallest idx)
                      h
                      (recur (assoc h
                                   idx (nth h smallest)
                                   smallest (nth h idx))))))))]
      (if (= 1 (count heap))
        [(first heap) (BinaryHeap. [])]
        [(first heap)
         (BinaryHeap. (sift-down (assoc (pop heap) 0 (last heap))))]))))

;; Constructor function
(defn make-priority-queue []
  (BinaryHeap. []))

;; Example usage
(defn test-priority-queue []
  (let [pq (-> (make-priority-queue)
               (insert 5)
               (insert 3)
               (insert 7)
               (insert 1))]
    (println "Initial queue:" (.heap pq))
    (let [[min-val new-pq] (extract-min pq)]
      (println "Extracted minimum:" min-val)
      (println "Updated queue:" (.heap new-pq)))))

;; Task scheduling example
(defrecord Task [priority description])

(defn make-task [priority description]
  (Task. priority description))

(defn schedule-tasks []
  (let [pq (-> (make-priority-queue)
               (insert (make-task 3 "Write tests"))
               (insert (make-task 1 "Fix critical bug"))
               (insert (make-task 2 "Update documentation")))]
    (loop [current-pq pq]
      (when-not (empty? current-pq)
        (let [[task new-pq] (extract-min current-pq)]
          (println "Processing task:" (:description task)
                   "with priority:" (:priority task))
          (recur new-pq))))))

11. Implementation Comparison

Let's compare the three implementations:

11.1. Python (heapq)

  • Uses a list as the underlying data structure
  • Provides built-in heap operations
  • Maintains min-heap property automatically
  • Memory efficient due to array-based implementation
  • Best for general use cases due to built-in optimization

11.2. Guile Scheme

  • Uses vectors for O(1) random access
  • Implements classical heap algorithms
  • More verbose due to manual memory management
  • Good for educational purposes and understanding heap mechanics
  • Functional programming style with explicit state management

11.3. Clojure

  • Leverages persistent data structures
  • Implements a proper type system with protocols
  • Immutable by default, returning new queue instances
  • More complex but thread-safe implementation
  • Ideal for concurrent applications

Common characteristics:

  1. All maintain O(log n) complexity for insert and extract operations
  2. All implement the core priority queue operations:
    • Insert/Push
    • Extract-min/Pop
    • Peek
    • Empty check
  3. All use binary heap as the underlying data structure
print("\nExtended tutorial completed successfully!")

Author: Training Materials

jwalsh@nexus

Last Updated: 2025-07-30 13:45:28

build: 2025-12-23 09:12 | sha: e32f33e