"""
.. module:: directed_paths
:synopsis: Defines several functions for executing various
path/connectivity queries on a directed hypergraph.
"""
try:
from queue import Queue
except ImportError:
from Queue import Queue
from halp.directed_hypergraph import DirectedHypergraph
from halp.utilities.priority_queue import PriorityQueue
# TODO-A: consider including target_node (with default value as None) in visit
# and b-visit to allow for early stoppage in an is_connected check and in an
# is_b_connected check
# TODO-B: consider maybe also caching the results from one execution of
# is_connected and is_b_connected to be able to check many node's for
# connectivity in only a single call of either visit or b_visit
[docs]def visit(H, source_node):
"""Executes the 'Visit' algorithm described in the paper:
Giorgio Gallo, Giustino Longo, Stefano Pallottino, Sang Nguyen,
Directed hypergraphs and applications, Discrete Applied Mathematics,
Volume 42, Issues 2-3, 27 April 1993, Pages 177-201, ISSN 0166-218X,
http://dx.doi.org/10.1016/0166-218X(93)90045-P.
(http://www.sciencedirect.com/science/article/pii/0166218X9390045P)
The Visit algorithm begins from a source node and traverses a hyperedge
after any node in the hyperedge's tail has been reached.
:param H: the hypergraph to perform the 'Visit' algorithm on.
:param source_node: the initial node to begin traversal from.
:returns: set -- nodes that were visited in this traversal.
dict -- mapping from each node to the ID of the hyperedge that
preceeded it in this traversal; will map a node to None
if that node wasn't visited or if that node is the source
node.
dict -- mapping from each hyperedge ID to the node that preceeded
it in this traversal.
:raises: TypeError -- Algorithm only applicable to directed hypergraphs
"""
if not isinstance(H, DirectedHypergraph):
raise TypeError("Algorithm only applicable to directed hypergraphs")
node_set = H.get_node_set()
# Pv keeps track of the ID of the hyperedge that directely
# preceeded each node in the traversal
Pv = {node: None for node in node_set}
hyperedge_id_set = H.get_hyperedge_id_set()
# Pe keeps track of the node that directedly preceeded
# each hyperedge in the traversal
Pe = {hyperedge_id: None for hyperedge_id in hyperedge_id_set}
# Explicitly tracks the set of visited nodes
visited_nodes = set([source_node])
Q = Queue()
Q.put(source_node)
while not Q.empty():
current_node = Q.get()
# At current_node, we can traverse each hyperedge in its forward star
for hyperedge_id in H.get_forward_star(current_node):
if Pe[hyperedge_id] is not None:
continue
Pe[hyperedge_id] = current_node
# Traversing a hyperedge in current_node's forward star yields
# the set of head nodes of the hyperedge; visit each head node
for head_node in H.get_hyperedge_head(hyperedge_id):
if head_node in visited_nodes:
continue
Pv[head_node] = hyperedge_id
Q.put(head_node)
visited_nodes.add(head_node)
return visited_nodes, Pv, Pe
[docs]def is_connected(H, source_node, target_node):
"""Checks if a target node is connected to a source node. That is,
this method determines if a target node can be visited from the source
node in the sense of the 'Visit' algorithm.
Refer to 'visit's documentation for more details.
:param H: the hypergraph to check connectedness on.
:param source_node: the node to check connectedness to.
:param target_node: the node to check connectedness of.
:returns: bool -- whether target_node can be visited from source_node.
"""
visited_nodes, Pv, Pe = visit(H, source_node)
return target_node in visited_nodes
def _x_visit(H, source_node, b_visit):
"""General form of the B-Visit algorithm, extended to also perform
an implicit F-Visit if the b_visit flag is not set (providing better
time/memory performance than explcitily taking the hypergraph's
symmetric image and then performing the B-Visit on that).
Refer to 'b_visit's or 'f_visit's documentation for more details.
:param H: the hypergraph to perform the 'B-Visit' algorithm on.
:param source_node: the initial node to begin traversal from.
:param b_visit: boolean flag representing whether a B-Visit should
be performed (vs an F-Visit).
:returns: set -- nodes that were x-visited in this traversal.
dict -- mapping from each node visited to the ID of the hyperedge
that preceeded it in this traversal.
dict -- mapping from each hyperedge ID to the node that preceeded
it in this traversal.
dict -- mapping from each node to an integer representing the
cardinality of the path from the source node to that node.
:raises: TypeError -- Algorithm only applicable to directed hypergraphs
"""
if not isinstance(H, DirectedHypergraph):
raise TypeError("Algorithm only applicable to directed hypergraphs")
# If the b_visit flag is set, perform a traditional B-Visit
if b_visit:
forward_star = H.get_forward_star
hyperedge_tail = H.get_hyperedge_tail
hyperedge_head = H.get_hyperedge_head
# If the b_visit flag is not set, implicitly perform an F-Visit by
# implicitly taking the symmetric image (what the 'else' statement
# is for) and then performing a traditional B-Visit
else:
forward_star = H.get_backward_star
hyperedge_tail = H.get_hyperedge_head
hyperedge_head = H.get_hyperedge_tail
node_set = H.get_node_set()
# Pv keeps track of the ID of the hyperedge that directely
# preceeded each node in the traversal
Pv = {node: None for node in node_set}
# v keeps track of the cardinality of the path from the source node to
# any other B-connected node ('inf' cardinality for non-B-connected nodes)
v = {node: float("inf") for node in node_set}
v[source_node] = 0
hyperedge_id_set = H.get_hyperedge_id_set()
# Pe keeps track of the node that directedly preceeded
# each hyperedge in the traversal
Pe = {hyperedge_id: None for hyperedge_id in hyperedge_id_set}
# k keeps track of how many nodes in the tail of each hyperedge are
# B-connected (when all nodes in a tail are B-connected, that hyperedge
# can then be traversed)
k = {hyperedge_id: 0 for hyperedge_id in hyperedge_id_set}
# Explicitly tracks the set of B-visited nodes
x_visited_nodes = set([source_node])
Q = Queue()
Q.put(source_node)
while not Q.empty():
current_node = Q.get()
# At current_node, we can traverse each hyperedge in its forward star
for hyperedge_id in forward_star(current_node):
# Since we're arrived at a new node, we increment
# k[hyperedge_id] to indicate that we've reached 1 new
# node in this hyperedge's tail
k[hyperedge_id] += 1
# Traverse this hyperedge only when we have reached all the nodes
# in its tail (i.e., when k[hyperedge_id] == |T(hyperedge_id)|)
if k[hyperedge_id] == len(hyperedge_tail(hyperedge_id)):
Pe[hyperedge_id] = current_node
# Traversing the hyperedge yields the set of head nodes of
# the hyperedge; B-visit each head node
for head_node in hyperedge_head(hyperedge_id):
if head_node in x_visited_nodes:
continue
Pv[head_node] = hyperedge_id
Q.put(head_node)
v[head_node] = v[Pe[hyperedge_id]] + 1
x_visited_nodes.add(head_node)
return x_visited_nodes, Pv, Pe, v
[docs]def b_visit(H, source_node):
"""Executes the 'B-Visit' algorithm described in the paper:
Giorgio Gallo, Giustino Longo, Stefano Pallottino, Sang Nguyen,
Directed hypergraphs and applications, Discrete Applied Mathematics,
Volume 42, Issues 2-3, 27 April 1993, Pages 177-201, ISSN 0166-218X,
http://dx.doi.org/10.1016/0166-218X(93)90045-P.
(http://www.sciencedirect.com/science/article/pii/0166218X9390045P)
The B-Visit algorithm begins from a source node and traverses a hyperedge
after all nodes in the hyperedge's tail have been reached.
:param H: the hypergraph to perform the 'B-Visit' algorithm on.
:param source_node: the initial node to begin traversal from.
:returns: set -- nodes that were B-visited in this traversal.
dict -- mapping from each node visited to the ID of the hyperedge
that preceeded it in this traversal.
dict -- mapping from each hyperedge ID to the node that preceeded
it in this traversal.
dict -- mapping from each node to an integer representing the
cardinality of the path from the source node to that node.
"""
return _x_visit(H, source_node, True)
[docs]def is_b_connected(H, source_node, target_node):
"""Checks if a target node is B-connected to a source node.
A node t is B-connected to a node s iff:
- t is s, or
- there exists an edge in the backward star of t such that all nodes in
the tail of that edge are B-connected to s
In other words, this method determines if a target node can be B-visited
from the source node in the sense of the 'B-Visit' algorithm. Refer to
'b_visit's documentation for more details.
:param H: the hypergraph to check B-connectedness on.
:param source_node: the node to check B-connectedness to.
:param target_node: the node to check B-connectedness of.
:returns: bool -- whether target_node can be visited from source_node.
"""
b_visited_nodes, Pv, Pe, v = b_visit(H, source_node)
return target_node in b_visited_nodes
[docs]def f_visit(H, source_node):
"""Executes the 'F-Visit' algorithm alluded to in the paper:
Giorgio Gallo, Giustino Longo, Stefano Pallottino, Sang Nguyen,
Directed hypergraphs and applications, Discrete Applied Mathematics,
Volume 42, Issues 2-3, 27 April 1993, Pages 177-201, ISSN 0166-218X,
http://dx.doi.org/10.1016/0166-218X(93)90045-P.
(http://www.sciencedirect.com/science/article/pii/0166218X9390045P)
The F-Visit algorithm performs a B-Visit on the hypergraph's symmetric
image, beginning at the source node. Refer to 'b_visit's documentation
for more details.
:param H: the hypergraph to perform the 'F-Visit' algorithm on.
:param source_node: the initial node to begin traversal from.
:returns: set -- nodes that were F-visited in this traversal.
dict -- mapping from each node to the ID of the hyperedge that
preceeded it in this traversal.
dict -- mapping from each hyperedge ID to the node that preceeded
it in this traversal.
dict -- mapping from each node to an integer representing the
cardinality of the path from the source node to that node.
"""
return _x_visit(H, source_node, False)
[docs]def is_f_connected(H, source_node, target_node):
"""Checks if a target node is F-connected to a source node.
A node t is F-connected to a node s iff s if B-connected to t.
Refer to 'f_visit's or 'is_b_connected's documentation for more details.
:param H: the hypergraph to check F-connectedness on.
:param source_node: the node to check F-connectedness to.
:param target_node: the node to check F-connectedness of.
:returns: bool -- whether target_node can be visited from source_node.
"""
f_visited_nodes, Pv, Pe, v = f_visit(H, source_node)
return target_node in f_visited_nodes
[docs]def sum_function(tail_nodes, W):
"""Additive sum function for nodes in the tail of a hyperedge.
:param tail_nodes: nodes in the tail of a hyperedge that, in conjunction
with the weight of the hyperedge, will additively
constitute the weight of the head node
:param W: node weighting function.
:returns: int -- sum of the weights of tail_nodes.
"""
return sum(W[node] for node in tail_nodes)
[docs]def distance_function(tail_nodes, W):
"""Additive distance (max) function for nodes in the tail of a hyperedge.
Also commonly referred to as the 'rank' function.
:param tail_nodes: nodes in the tail of a hyperedge that, in conjunction
with the weight of the hyperedge, will additively
constitute the weight of the head node.
:param W: node weighting function.
:returns: int -- max of the weights of tail_nodes.
"""
return max(W[node] for node in tail_nodes)
[docs]def gap_function(tail_nodes, W):
"""Additive min function for nodes in the tail of a hyperedge.
:param tail_nodes: nodes in the tail of a hyperedge that, in conjunction
with the weight of the hyperedge, will additively
constitute the weight of the head node
:param W: node weighting function
:returns: int -- max of the weights of tail_nodes
"""
return min(W[node] for node in tail_nodes)
def _shortest_x_tree(H, source_node, b_tree,
F=sum_function, valid_ordering=False):
"""General form of the Shorest B-Tree algorithm, extended to also
perform the implicit Shortest F-Tree procedure if the b_tree flag is
not set (providing better time/memory performance than explcitily taking
the hypergraph's symmetric image and then performing the SBT procedure
on that).
Uses priority queue to achieve O(size(H)*lg(n)) runtime.
Refer to 'shorest_b_tree's or 'shorest_f_tree's documentation for
more details.
:param H: the H to perform the 'SXT' algorithm on.
:param source_node: the root of the tree to be found.
:param b_tree: boolean flag representing whether the Shortest B-Tree
algorithm should be executed (vs the Shortest F-Tree).
:param F: function pointer to any additive weight function; that is,
any function that is only a function of the weights of the
nodes in the tail of a hyperedge.
:param valid_ordering: a boolean flag to signal whether or not a valid
ordering of the nodes should be returned.
:returns: dict -- mapping from each node to the ID of the hyperedge that
preceeded it in this traversal.
dict -- mapping from each node to the node's weight.
list -- [only if valid_ordering argument is passed] a valid
ordering of the nodes.
:raises: TypeError -- Algorithm only applicable to directed hypergraphs
"""
if not isinstance(H, DirectedHypergraph):
raise TypeError("Algorithm only applicable to directed hypergraphs")
if b_tree:
forward_star = H.get_forward_star
hyperedge_tail = H.get_hyperedge_tail
hyperedge_head = H.get_hyperedge_head
else:
forward_star = H.get_backward_star
hyperedge_tail = H.get_hyperedge_head
hyperedge_head = H.get_hyperedge_tail
hyperedge_weight = H.get_hyperedge_weight
node_set = H.get_node_set()
# Pv keeps track of the ID of the hyperedge that directely
# preceeded each node in the traversal
Pv = {node: None for node in node_set}
hyperedge_ids = H.get_hyperedge_id_set()
# W keeps track of the smallest weight path from the source node
# to each node
W = {node: float("inf") for node in node_set}
W[source_node] = 0
# k keeps track of how many nodes in the tail of each hyperedge are
# B-connected (when all nodes in a tail are B-connected, that hyperedge
# can then be traversed)
k = {hyperedge_id: 0 for hyperedge_id in hyperedge_ids}
# List of nodes removed from the priority queue in the order that
# they were removed
ordering = []
Q = PriorityQueue()
Q.add_element(W[source_node], source_node)
while not Q.is_empty():
# At current_node, we can traverse each hyperedge in its forward star
current_node = Q.get_top_priority()
ordering.append(current_node)
for hyperedge_id in forward_star(current_node):
# Since we're arrived at a new node, we increment
# k[hyperedge_id] to indicate that we've reached 1 new
# node in this hyperedge's tail
k[hyperedge_id] += 1
# Traverse this hyperedge only when we have reached all the nodes
# in its tail (i.e., when k[hyperedge_id] == |T(hyperedge_id)|)
if k[hyperedge_id] == len(hyperedge_tail(hyperedge_id)):
f = F(hyperedge_tail(hyperedge_id), W)
# For each node in the head of the newly-traversed hyperedge,
# if the previous weight of the node is more than the new
# weight...
for head_node in \
[node for node in hyperedge_head(hyperedge_id) if
W[node] > hyperedge_weight(hyperedge_id) + f]:
# Update its weight to the new, smaller weight
W[head_node] = hyperedge_weight(hyperedge_id) + f
Pv[head_node] = hyperedge_id
# If it's not already in the priority queue...
if not Q.contains_element(head_node):
# Add it to the priority queue
Q.add_element(W[head_node], head_node)
else:
# Otherwise, decrease it's key in the priority queue
Q.reprioritize(W[head_node], head_node)
if valid_ordering:
return Pv, W, ordering
else:
return Pv, W
[docs]def shortest_b_tree(H, source_node,
F=sum_function, valid_ordering=False):
"""Executes the Shortest B-Tree (SBT) algorithm described in the paper:
Giorgio Gallo, Giustino Longo, Stefano Pallottino, Sang Nguyen,
Directed hypergraphs and applications, Discrete Applied Mathematics,
Volume 42, Issues 2-3, 27 April 1993, Pages 177-201, ISSN 0166-218X,
http://dx.doi.org/10.1016/0166-218X(93)90045-P.
(http://www.sciencedirect.com/science/article/pii/0166218X9390045P)
SBT finds a set of minimum weight B-paths from a source node to all
the nodes y which are B-connected to it (when additive weight functions
are used). Refer to 'is_b_connected's documentation for more details.
:param H: the hypergraph to perform the 'SBT' algorithm on.
:param source_node: the root of the tree to be found.
:param F: function pointer to any additive weight function; that is,
any function that is only a function of the weights of the
nodes in the tail of a hyperedge.
:param valid_ordering: a boolean flag to signal whether or not a valid
ordering of the nodes should be returned.
:returns: dict -- mapping from each node to the ID of the hyperedge that
preceeded it in this traversal.
dict -- mapping from each node to the node's weight.
list -- [only if valid_ordering argument is passed] a valid
ordering of the nodes.
"""
return _shortest_x_tree(H, source_node, True, F, valid_ordering)
[docs]def shortest_f_tree(H, source_node,
F=sum_function, valid_ordering=False):
"""Executes the Shortest F-Tree algorithm, which is simply an execution
of the Shorest B-Tree procedure from the source node on the hypergraph's
symmetric image. Refer to 'shortest_b_tree's documentation for more
details.
:param hypergraph: the hypergraph to perform the 'SFT' algorithm on.
:param source_node: the root of the tree to be found.
:param F: function pointer to any additive weight function; that is,
any function that is only a function of the weights of the
nodes in the tail of a hyperedge.
:param valid_ordering: a boolean flag to signal whether or not a valid
ordering of the nodes should be returned.
:returns: dict -- mapping from each node to the ID of the hyperedge that
preceeded it in this traversal.
dict -- mapping from each node to the node's weight.
list -- [only if valid_ordering argument is passed] a valid
ordering of the nodes.
"""
return _shortest_x_tree(H, source_node, False, F, valid_ordering)
[docs]def get_hypertree_from_predecessors(H, Pv, source_node,
node_weights=None, attr_name="weight"):
"""Gives the hypertree (i.e., the subhypergraph formed from the union of
the set of paths from an execution of, e.g., the SBT algorithm) defined by
Pv beginning at a source node. Returns a dictionary mapping each node to
the ID of the hyperedge that preceeded it in the path (i.e., a Pv vector).
Assigns the node weights (if provided) as attributes of the nodes (e.g.,
the rank of that node in a specific instance of the SBT algorithm, or the
cardinality of that node in a B-Visit traversal, etc.).
:note: The IDs of the hyperedges in the subhypergraph returned may be
different than those in the original hypergraph (even though the
tail and head sets are identical).
:param H: the hypergraph which the path algorithm was executed on.
:param Pv: dictionary mapping each node to the ID of the hyperedge that
preceeded it in the path.
:param source_node: the root of the executed path algorithm.
:param node_weights: [optional] dictionary mapping each node to some weight
measure.
:param attr_name: key into the nodes' attribute dictionaries for their
weight values (if node_weights is provided).
:returns: DirectedHypergraph -- subhypergraph induced by the path
algorithm specified by the predecessor vector (Pv) from a
source node.
:raises: TypeError -- Algorithm only applicable to directed hypergraphs
"""
if not isinstance(H, DirectedHypergraph):
raise TypeError("Algorithm only applicable to directed hypergraphs")
sub_H = DirectedHypergraph()
# If node weights are not provided, simply collect all the nodes that are
# will be in the hypertree
if node_weights is None:
nodes = [node for node in Pv.keys() if Pv[node] is not None]
nodes.append(source_node)
# If node weights are provided, collect all the nodes that will be in the
# tree and pair them with their corresponding weights
else:
nodes = [(node, {attr_name: node_weights[node]})
for node in Pv.keys() if Pv[node] is not None]
nodes.append((source_node, {attr_name: node_weights[source_node]}))
# Add the collected elements to the hypergraph
sub_H.add_nodes(nodes)
# Add all hyperedges, specified by Pv, to the hypergraph
hyperedges = [(H.get_hyperedge_tail(hyperedge_id),
H.get_hyperedge_head(hyperedge_id),
H.get_hyperedge_attributes(hyperedge_id))
for hyperedge_id in Pv.values() if hyperedge_id is not None]
sub_H.add_hyperedges(hyperedges)
return sub_H
[docs]def get_hyperpath_from_predecessors(H, Pv, source_node, destination_node,
node_weights=None, attr_name="weight"):
"""Gives the hyperpath (DirectedHypergraph) representing the shortest
B-hyperpath from the source to the destination, given a predecessor
function and source and destination nodes.
:note: The IDs of the hyperedges in the subhypergraph returned may be
different than those in the original hypergraph (even though the
tail and head sets are identical).
:param H: the hypergraph which the path algorithm was executed on.
:param Pv: dictionary mapping each node to the ID of the hyperedge that
preceeded it in the path.
:param source_node: the source node of the path.
:param destination_node: the destination node of the path.
:returns: DirectedHypergraph -- shortest B-hyperpath from source_node to
destination_node.
:raises: TypeError -- Algorithm only applicable to directed hypergraphs
:raises: KeyError -- Node key in predecessor is not in H
:raises: KeyError -- Hyperedge key in predecessor is not in H
:raises: ValueError -- Multiple nodes without predecessor
:raises: ValueError -- Hypertree does not have source node
"""
if not isinstance(H, DirectedHypergraph):
raise TypeError("Algorithm only applicable to directed hypergraphs")
# Check that Pv is a valid predecessor function:
# - keys must be nodes in H mapping to hyperedges in H
# - exactly one node must map to None (i.e., only one node
# without predecessor)
nodes_without_predecessor = 0
for node, hyperedge_id in Pv.items():
if not H.has_node(node):
raise KeyError(
"Node key %s in predecessor is not in H" % node)
if hyperedge_id is None:
nodes_without_predecessor += 1
elif not H.has_hyperedge_id(hyperedge_id):
raise KeyError(
"Hyperedge key %s in predecessor is not in H" % hyperedge_id)
if nodes_without_predecessor > 1:
raise ValueError(
"Multiple nodes without predecessor. %s received" % Pv)
elif nodes_without_predecessor == 0:
raise ValueError(
"Hypertree does not have source node. %s received" % Pv)
path = DirectedHypergraph()
# keep track of which nodes are or have been processed
processedOrInQueue = {n: False for n in Pv}
nodesToProcess = [destination_node]
processedOrInQueue[destination_node] = True
while nodesToProcess:
node = nodesToProcess.pop(0)
hyperedge_id = Pv[node]
if hyperedge_id:
for n in H.get_hyperedge_tail(hyperedge_id):
if not processedOrInQueue[n]:
nodesToProcess.append(n)
processedOrInQueue[n] = True
path.add_hyperedge(H.get_hyperedge_tail(hyperedge_id),
H.get_hyperedge_head(hyperedge_id),
weight=H.get_hyperedge_weight(
hyperedge_id))
elif not path.has_node(node):
path.add_node(node)
return path