halp 1.0 documentation

halp.utilities.priority_queue

Contents

Source code for halp.utilities.priority_queue

"""
.. module:: priority_queue
   :synopsis: Wrapper for python's heapq to provide some standard
            operations expected a priority queue.

"""
import heapq
import itertools


[docs]class PriorityQueue(object): """ Priority queue implementation based on the suggested "mark-as-invalid" technique by Python's heapq documentation: https://docs.python.org/3.4/library/heapq.html#priority-queue-implementation-notes :note: This implementation only allows unique elements. """ def __init__(self): """ Constructor for the PriorityQueue class. """ # pq: the priority queue list; heapq ensures that pq maintains # the heap invariant self.pq = [] # counter: unique sequence count (to resolve element orderings in the # case of equal priorities) self.counter = itertools.count(1) # element_finder: mapping of elements to entries (allows element # priority modification or deletion) self.element_finder = {} # INVALID: mark an entry as deleted (via assigning count of 0) self.INVALID = 0
[docs] def add_element(self, priority, element, count=None): """Adds an element with a specific priority. :param priority: priority of the element. :param element: element to add. """ if count is None: count = next(self.counter) entry = [priority, count, element] self.element_finder[element] = entry heapq.heappush(self.pq, entry)
[docs] def get_top_priority(self): """Pops the element that has the top (smallest) priority. :returns: element with the top (smallest) priority. :raises: IndexError -- Priority queue is empty. """ if self.is_empty(): raise IndexError("Priority queue is empty.") _, _, element = heapq.heappop(self.pq) if element in self.element_finder: del self.element_finder[element] return element
[docs] def delete_element(self, element): """Deletes an element (lazily). :raises: ValueError -- No such element in the priority queue. """ if element not in self.element_finder: raise ValueError("No such element in the priority queue.") entry = self.element_finder[element] entry[1] = self.INVALID
[docs] def reprioritize(self, priority, element): """Updates the priority of an element. :raises: ValueError -- No such element in the priority queue. """ if element not in self.element_finder: raise ValueError("No such element in the priority queue.") entry = self.element_finder[element] self.add_element(priority, element, entry[1]) entry[1] = self.INVALID
[docs] def peek(self): """Returns the element with top (lowest) priority without popping it. :returns: element with top (lowest) priority. :raises: IndexError -- Priority queue is empty. """ if self.is_empty(): raise IndexError("Priority queue is empty.") return self.pq[0][2]
[docs] def contains_element(self, element): """Determines if an element is contained in the priority queue." :returns: bool -- true iff element is in the priority queue. """ return (element in self.element_finder) and \ (self.element_finder[element][1] != self.INVALID)
[docs] def is_empty(self): """Determines if the priority queue has any elements. Performs removal of any elements that were "marked-as-invalid". :returns: true iff the priority queue has no elements. """ while self.pq: if self.pq[0][1] != self.INVALID: return False else: _, _, element = heapq.heappop(self.pq) if element in self.element_finder: del self.element_finder[element] return True

Contents