1 from types import GenericAlias
2
3 __all__ = ["TopologicalSorter", "CycleError"]
4
5 _NODE_OUT = -1
6 _NODE_DONE = -2
7
8
9 class ESC[4;38;5;81m_NodeInfo:
10 __slots__ = "node", "npredecessors", "successors"
11
12 def __init__(self, node):
13 # The node this class is augmenting.
14 self.node = node
15
16 # Number of predecessors, generally >= 0. When this value falls to 0,
17 # and is returned by get_ready(), this is set to _NODE_OUT and when the
18 # node is marked done by a call to done(), set to _NODE_DONE.
19 self.npredecessors = 0
20
21 # List of successor nodes. The list can contain duplicated elements as
22 # long as they're all reflected in the successor's npredecessors attribute.
23 self.successors = []
24
25
26 class ESC[4;38;5;81mCycleError(ESC[4;38;5;149mValueError):
27 """Subclass of ValueError raised by TopologicalSorter.prepare if cycles
28 exist in the working graph.
29
30 If multiple cycles exist, only one undefined choice among them will be reported
31 and included in the exception. The detected cycle can be accessed via the second
32 element in the *args* attribute of the exception instance and consists in a list
33 of nodes, such that each node is, in the graph, an immediate predecessor of the
34 next node in the list. In the reported list, the first and the last node will be
35 the same, to make it clear that it is cyclic.
36 """
37
38 pass
39
40
41 class ESC[4;38;5;81mTopologicalSorter:
42 """Provides functionality to topologically sort a graph of hashable nodes"""
43
44 def __init__(self, graph=None):
45 self._node2info = {}
46 self._ready_nodes = None
47 self._npassedout = 0
48 self._nfinished = 0
49
50 if graph is not None:
51 for node, predecessors in graph.items():
52 self.add(node, *predecessors)
53
54 def _get_nodeinfo(self, node):
55 if (result := self._node2info.get(node)) is None:
56 self._node2info[node] = result = _NodeInfo(node)
57 return result
58
59 def add(self, node, *predecessors):
60 """Add a new node and its predecessors to the graph.
61
62 Both the *node* and all elements in *predecessors* must be hashable.
63
64 If called multiple times with the same node argument, the set of dependencies
65 will be the union of all dependencies passed in.
66
67 It is possible to add a node with no dependencies (*predecessors* is not provided)
68 as well as provide a dependency twice. If a node that has not been provided before
69 is included among *predecessors* it will be automatically added to the graph with
70 no predecessors of its own.
71
72 Raises ValueError if called after "prepare".
73 """
74 if self._ready_nodes is not None:
75 raise ValueError("Nodes cannot be added after a call to prepare()")
76
77 # Create the node -> predecessor edges
78 nodeinfo = self._get_nodeinfo(node)
79 nodeinfo.npredecessors += len(predecessors)
80
81 # Create the predecessor -> node edges
82 for pred in predecessors:
83 pred_info = self._get_nodeinfo(pred)
84 pred_info.successors.append(node)
85
86 def prepare(self):
87 """Mark the graph as finished and check for cycles in the graph.
88
89 If any cycle is detected, "CycleError" will be raised, but "get_ready" can
90 still be used to obtain as many nodes as possible until cycles block more
91 progress. After a call to this function, the graph cannot be modified and
92 therefore no more nodes can be added using "add".
93 """
94 if self._ready_nodes is not None:
95 raise ValueError("cannot prepare() more than once")
96
97 self._ready_nodes = [
98 i.node for i in self._node2info.values() if i.npredecessors == 0
99 ]
100 # ready_nodes is set before we look for cycles on purpose:
101 # if the user wants to catch the CycleError, that's fine,
102 # they can continue using the instance to grab as many
103 # nodes as possible before cycles block more progress
104 cycle = self._find_cycle()
105 if cycle:
106 raise CycleError(f"nodes are in a cycle", cycle)
107
108 def get_ready(self):
109 """Return a tuple of all the nodes that are ready.
110
111 Initially it returns all nodes with no predecessors; once those are marked
112 as processed by calling "done", further calls will return all new nodes that
113 have all their predecessors already processed. Once no more progress can be made,
114 empty tuples are returned.
115
116 Raises ValueError if called without calling "prepare" previously.
117 """
118 if self._ready_nodes is None:
119 raise ValueError("prepare() must be called first")
120
121 # Get the nodes that are ready and mark them
122 result = tuple(self._ready_nodes)
123 n2i = self._node2info
124 for node in result:
125 n2i[node].npredecessors = _NODE_OUT
126
127 # Clean the list of nodes that are ready and update
128 # the counter of nodes that we have returned.
129 self._ready_nodes.clear()
130 self._npassedout += len(result)
131
132 return result
133
134 def is_active(self):
135 """Return ``True`` if more progress can be made and ``False`` otherwise.
136
137 Progress can be made if cycles do not block the resolution and either there
138 are still nodes ready that haven't yet been returned by "get_ready" or the
139 number of nodes marked "done" is less than the number that have been returned
140 by "get_ready".
141
142 Raises ValueError if called without calling "prepare" previously.
143 """
144 if self._ready_nodes is None:
145 raise ValueError("prepare() must be called first")
146 return self._nfinished < self._npassedout or bool(self._ready_nodes)
147
148 def __bool__(self):
149 return self.is_active()
150
151 def done(self, *nodes):
152 """Marks a set of nodes returned by "get_ready" as processed.
153
154 This method unblocks any successor of each node in *nodes* for being returned
155 in the future by a call to "get_ready".
156
157 Raises :exec:`ValueError` if any node in *nodes* has already been marked as
158 processed by a previous call to this method, if a node was not added to the
159 graph by using "add" or if called without calling "prepare" previously or if
160 node has not yet been returned by "get_ready".
161 """
162
163 if self._ready_nodes is None:
164 raise ValueError("prepare() must be called first")
165
166 n2i = self._node2info
167
168 for node in nodes:
169
170 # Check if we know about this node (it was added previously using add()
171 if (nodeinfo := n2i.get(node)) is None:
172 raise ValueError(f"node {node!r} was not added using add()")
173
174 # If the node has not being returned (marked as ready) previously, inform the user.
175 stat = nodeinfo.npredecessors
176 if stat != _NODE_OUT:
177 if stat >= 0:
178 raise ValueError(
179 f"node {node!r} was not passed out (still not ready)"
180 )
181 elif stat == _NODE_DONE:
182 raise ValueError(f"node {node!r} was already marked done")
183 else:
184 assert False, f"node {node!r}: unknown status {stat}"
185
186 # Mark the node as processed
187 nodeinfo.npredecessors = _NODE_DONE
188
189 # Go to all the successors and reduce the number of predecessors, collecting all the ones
190 # that are ready to be returned in the next get_ready() call.
191 for successor in nodeinfo.successors:
192 successor_info = n2i[successor]
193 successor_info.npredecessors -= 1
194 if successor_info.npredecessors == 0:
195 self._ready_nodes.append(successor)
196 self._nfinished += 1
197
198 def _find_cycle(self):
199 n2i = self._node2info
200 stack = []
201 itstack = []
202 seen = set()
203 node2stacki = {}
204
205 for node in n2i:
206 if node in seen:
207 continue
208
209 while True:
210 if node in seen:
211 # If we have seen already the node and is in the
212 # current stack we have found a cycle.
213 if node in node2stacki:
214 return stack[node2stacki[node] :] + [node]
215 # else go on to get next successor
216 else:
217 seen.add(node)
218 itstack.append(iter(n2i[node].successors).__next__)
219 node2stacki[node] = len(stack)
220 stack.append(node)
221
222 # Backtrack to the topmost stack entry with
223 # at least another successor.
224 while stack:
225 try:
226 node = itstack[-1]()
227 break
228 except StopIteration:
229 del node2stacki[stack.pop()]
230 itstack.pop()
231 else:
232 break
233 return None
234
235 def static_order(self):
236 """Returns an iterable of nodes in a topological order.
237
238 The particular order that is returned may depend on the specific
239 order in which the items were inserted in the graph.
240
241 Using this method does not require to call "prepare" or "done". If any
242 cycle is detected, :exc:`CycleError` will be raised.
243 """
244 self.prepare()
245 while self.is_active():
246 node_group = self.get_ready()
247 yield from node_group
248 self.done(*node_group)
249
250 __class_getitem__ = classmethod(GenericAlias)