Source code for weighted_topsort
"""
It is an implementation of Topological sort using Depth First Search.
The implementation provides a way to sort the nodes of the graph based on
the weights assigned to each node. When a node is dependent on two other nodes
the node with lowest weight is resolved first.
The input graph is a json structure with dependecies for the node are
indicated as a list. The weights for the nodes are provided as a set of
tag-value pairs.
"""
import heapq
from collections import deque
[docs]class CyclicGraphError(Exception):
def __init__(self, message):
super().__init__(message)
self.errors = message
[docs]class TopSort(list):
"""
Topological sort with weighted nodes
:param graph: Directed graph to be sorted in json format with
dependencies as values to nodes
:type graph: json
:param weights: The weights of nodes as json key value pairs
:type weights: json
:rtype: list
"""
def __init__(self, graph, weights):
self.data = graph
self.weights = weights.copy()
self.visit_stack = deque()
self.sort_stack = deque()
self.__process_nodes(self.data)
def __iter__(self):
return iter(self.sort_stack)
def __process_nodes(self, jsondata):
keys_heap = [(self.weights.get(k, 1), k) for k in jsondata.keys()]
heapq.heapify(keys_heap)
while keys_heap:
_, key = heapq.heappop(keys_heap)
if not (key in self.visit_stack):
self.visit_stack.append(key)
if jsondata.get(key):
ch_keys_heap = [
(self.weights.get(k, 1), k) for k in jsondata.get(key)
]
heapq.heapify(ch_keys_heap)
while ch_keys_heap:
_, ch_val = heapq.heappop(ch_keys_heap)
if not (ch_val in self.visit_stack):
if str(ch_val) in self.data.keys():
self.__process_nodes(
{ch_val: self.data.get(str(ch_val))})
else:
self.visit_stack.append(ch_val)
self.sort_stack.append(str(ch_val))
elif not (ch_val in self.sort_stack):
raise CyclicGraphError(
'Cycle detected in path {} -> {}'
.format(str(self.visit_stack), str(ch_val)))
self.sort_stack.append(str(key))
return self.sort_stack