Skip to content

Instantly share code, notes, and snippets.

@tommct
Last active April 30, 2021 17:06
Show Gist options
  • Save tommct/731875648fdfeebc3118bb5c51c65993 to your computer and use it in GitHub Desktop.
Save tommct/731875648fdfeebc3118bb5c51c65993 to your computer and use it in GitHub Desktop.
Generic Dijkstra's shortest paths implementation in Python using a priority queue with callback functionality as it visits nodes.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This notebook provides an interface to [Dijkstra's shortest paths algorithm](https://en.wikipedia.org/wiki/Dijkstra%27s_algorithm) with arbitrary graphs.\n",
"The nodes of the graph contain user-defined Python objects and the edges are weight penalties between those objects.\n",
"Given this structure, the algorithm provides the shortest path between two objects.\n",
"\n",
"The interface enables customized, extensible approaches such as the dynamic construction of graphs, user-defined weights of nodes and edges, and arbitrary terminating criteria."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"# -*- coding: utf-8 -*-\n",
"\"\"\"\n",
"Implements Dijkstra's algorithm using a priority queue with callback functionality so that \n",
"nodes can be processed with user-defined functionality.\n",
"\"\"\"\n",
"\n",
"import sys\n",
"import queue\n",
"\n",
"\n",
"class Edge(object):\n",
" \"\"\"\n",
" Each edge of the graph is assigned to the `parents` or `children` list of a Node object. \n",
" Therefore, we assume that Edge objects are placed in the `parents` or `children` lists \n",
" of a Node object, so we do not have to specify the direction or relationship here.\n",
" \"\"\"\n",
" def __init__(self, node, weight=None, **kwargs):\n",
" \"\"\"\n",
" Set member variables to those passed in.\n",
"\n",
" Args:\n",
" node (Node): A node that is either a parent or a child of where this edge \n",
" object resides. If the Edge is in a children list, then the node object \n",
" here in the Edge object is a child node to the Node that holds the\n",
" children list.\n",
" **kwargs: User-defined values that can be passed in, such as edge labels or \n",
" other attributes.\n",
" \"\"\"\n",
" self.node = node\n",
" self.weight = weight\n",
" self.__dict__.update(kwargs)\n",
"\n",
"\n",
"class Node(object):\n",
" \"\"\"\n",
" A graph consists of Nodes and Edges. This object is the Node in a directed graph, \n",
" which contains two lists of Edge objects: parents and children. Also, this object \n",
" maintains a reference to an arbitrary user-defined object.\n",
" \"\"\"\n",
" def __init__(self, obj, isleaf=False):\n",
" \"\"\"\n",
"\n",
" Args:\n",
" obj: A user-defined embedded object within the Node.\n",
" isleaf: Flag to determine if a Node is to be a leaf along a path. Indeed, \n",
" within a graph, it may have child edges, but for path-building, this flag \n",
" should be set to True for paths going through it to terminate if it does \n",
" have children.\n",
" \"\"\"\n",
" self.obj = obj\n",
" self.parents = [] # List of Edge objects\n",
" self.children = [] # List of Edge objects\n",
" self.distance = sys.maxsize # Minimum cumulative path weight to this node that \n",
" # gets set/updated in the priority_paths algorithm\n",
" self.processed = False # Flag set when this node is popped from the queue.\n",
"\n",
" # The following two functions are needed for queue so that items with equivalent \n",
" # priority will process correctly, which is that they will be drawn randomly.\n",
" def __eq__(self, other):\n",
" return self.obj == other.obj\n",
"\n",
" def __lt__(self, other):\n",
" return self.obj.__repr__() < other.obj.__repr__()\n",
"\n",
" def __repr__(self):\n",
" return repr(self.obj)\n",
" # return str(self.__class__) + \": \" + str(self.__dict__)\n",
"\n",
" \n",
"def dijkstras_using_queue(G, source_node, pop_callback=None, pop_kwargs={},\n",
" abort_func=None, abort_kwargs={}):\n",
" \"\"\"\n",
" Retrieve a graph that contains weighted paths from a source node to goal node(s).\n",
"\n",
" Args:\n",
" G: A graph, which is a list of Node objects. The graph must have at least a \n",
" starting Node.\n",
"\n",
" > Note: The obj object within the node may need to override \n",
" `__eq__` and `__lt__` for the priority queue to function correctly.\n",
"\n",
" source_node: The starting Node element in G.\n",
"\n",
" pop_callback: If not None, this is a function that gets called when an item \n",
" is popped from the priority queue and before its children are processed. \n",
" In dynamic, generative contexts, this function could be used to create \n",
" the Node's children. This function needs to accept the currently popped \n",
" Node, G, the current path list, and all paths. For example, it may look \n",
" something like: `def my_pop(node, G, path, paths): ...`\n",
"\n",
" pop_kwargs: dict of kwargs passed to pop_callback in addition to the \n",
" currently popped node, G, path, and paths. For example if \n",
" `pop_kwargs == {'param1': 1, 'param2': 'val2'}`, then the pop_callback\n",
" would look something like `def my_pop(node, G, path, paths, param1, param2): ...`\n",
"\n",
" abort_func: Callback function that permits an early abort of the search \n",
" algorithm. If not specified, the algorithm will run while items are in \n",
" the queue. If specified, the function will be called with the currently \n",
" popped node, the priority queue, the graph G, and abort_kwargs.\n",
"\n",
" abort_kwargs: dict of kwargs that are passed to the abort_func function \n",
" in addition to the graph G, and paths.\n",
"\n",
" Returns: Nothing, but paths gets updated and callback functions may alter G.\n",
"\n",
" \"\"\"\n",
" prev = {}\n",
" \n",
" q = queue.PriorityQueue() # Create a priority queue.\n",
" for node in G.values():\n",
" node.distance = sys.maxsize\n",
" node.processed = False\n",
"\n",
" # The queue gets fed tuples in the format:\n",
" #\n",
" # (priority, Node)\n",
" #\n",
" # where lowest priority values are popped first.\n",
" source_node.distance = 0\n",
" q.put((source_node.distance, source_node))\n",
"\n",
" def _stopfunc(q, abort_func, abort_kwargs):\n",
" if q.empty():\n",
" return True\n",
" if abort_func:\n",
" return abort_func(G, **abort_kwargs)\n",
" return False\n",
"\n",
" while not _stopfunc(q, abort_func, abort_kwargs):\n",
" priority, node = q.get() # Pop the next item off the queue\n",
" if node.processed == True:\n",
" continue\n",
" if pop_callback is not None:\n",
" try:\n",
" pop_callback(node, G, **pop_kwargs)\n",
" except BaseException as err:\n",
" print(\"priority_graph popcallback {}\".format(err))\n",
"\n",
" for edge in node.children:\n",
" child_node = edge.node\n",
" child_distance = edge.weight + node.distance\n",
" \n",
" # Add the cumulative weight from the previous element in the path \n",
" # and add to queue.\n",
" if child_distance < child_node.distance:\n",
" child_node.distance = child_distance\n",
" q.put((child_node.distance, child_node))\n",
" prev[child_node.obj] = node.obj\n",
" node.processed = True\n",
" return prev"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Example\n",
"We use the following graph from [Cormen et al.](https://en.wikipedia.org/wiki/Introduction_to_Algorithms) Section 24.3: Dijkstra's algorithm. ![Cormen 24.3 Image](http://www.euroinformatica.ro/documentation/programming/!!!Algorithms_CORMEN!!!/images/fig618_01_0.jpg)\n",
"\n",
"In this implementation, the graph, *G*, operates in a discovery mode. That is, it begins with a graph with a single node and an arbitary target node that does not yet belong to the graph. As the algorithm progresses, it builds an optimal path to the intended target. This type of implementation permits optimal graphs to be dynamically computed rather than to have computed a complete graph and ask for its shortest path.\n",
"\n",
"This implementation provides examples of various uses of callback functions:\n",
"\n",
" * `distance` for the edge weights between any two nodes.\n",
" * `graph_gen_callback` for processing each node as it is popped from the priority queue and adding new ones as this is operating in discovery mode.\n",
" * `abort_func` to have arbitrary stopping criteria. This could, for example, deal with timeouts, graphs becoming too large, or to even keep going a few more iterations to fill out the graph more for alternative and suboptimal paths."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"def distance(src_node, tgt_node, goal_nodes):\n",
" \"\"\"Provide the distance between the src_node and tgt_node also given \n",
" a goal_node\"\"\"\n",
" if src_node.obj == 's':\n",
" if tgt_node.obj == 't':\n",
" return 10\n",
" if tgt_node.obj == 'y':\n",
" return 5\n",
" elif src_node.obj == 't':\n",
" if tgt_node.obj == 'y':\n",
" return 2\n",
" if tgt_node.obj == 'x':\n",
" return 1\n",
" elif src_node.obj == 'x':\n",
" if tgt_node.obj == 'z':\n",
" return 4\n",
" elif src_node.obj == 'y':\n",
" if tgt_node.obj == 't':\n",
" return 3\n",
" if tgt_node.obj == 'x':\n",
" return 9\n",
" if tgt_node.obj == 'z':\n",
" return 2\n",
" elif src_node.obj == 'z':\n",
" if tgt_node.obj == 's':\n",
" return 7\n",
" if tgt_node.obj == 'x':\n",
" return 6\n",
" return 100000\n",
"\n",
"\n",
"def obj_in_graph(label, G):\n",
" \"\"\"If an object with this label already exists in the graph, return it. \n",
" Otherwise return None.\"\"\"\n",
" try:\n",
" return G[label]\n",
" except KeyError:\n",
" return None\n",
" except BaseException as err:\n",
" raise err\n",
"\n",
"\n",
"def graph_gen_callback(node, G, max_edgeweight=100, max_pathweight=100, \n",
" distance_callback=None, distance_kwargs={}):\n",
" \"\"\"\n",
" This generates the children associated with a given node from the graph \n",
" frequently used to illustrate Dijkstra's shortest paths algorithm. An example is at\n",
" http://www.euroinformatica.ro/documentation/programming/!!!Algorithms_CORMEN!!!/images/fig618_01_0.jpg\n",
"\n",
" Each Node is identified by its obj, which in this case is a letter. \n",
" Children are specified by their node labels and weights as a tuple.\n",
" \"\"\"\n",
" if len(node.children) > 0:\n",
" # We have already made children. This would be called if the graph contains cycles\n",
" # or if a shorter path to this node was already found. No need to process again.\n",
" return\n",
"\n",
" child_args = []\n",
" if node.obj == 's':\n",
" child_args = ['t', 'y']\n",
" elif node.obj == 't':\n",
" child_args = ['y', 'x']\n",
" elif node.obj == 'x':\n",
" child_args = ['z']\n",
" elif node.obj == 'y':\n",
" child_args = ['t', 'x', 'z']\n",
" elif node.obj == 'z':\n",
" child_args = ['s', 'x']\n",
"\n",
" path_weight = node.distance # Cumulative path weight to here.\n",
" for tgt_label in child_args:\n",
" child_node = obj_in_graph(tgt_label, G)\n",
"\n",
" if child_node is None:\n",
" child_node = Node(tgt_label)\n",
" G[tgt_label] = (child_node)\n",
" \n",
" if distance_callback:\n",
" weight = distance_callback(node, child_node, **distance_kwargs)\n",
" \n",
" if weight > max_edgeweight:\n",
" continue\n",
" if path_weight + weight > max_pathweight:\n",
" continue\n",
"\n",
" # To test some error functionality, uncomment the following.\n",
" # if tgt_label == 't' and node.obj == 'y':\n",
" # raise ValueError()\n",
"\n",
" node.children.append(Edge(node=child_node, weight=weight))\n",
" child_node.parents.append(Edge(node=node, weight=weight))\n",
"\n",
"def abort_func(G, max_nodes):\n",
" \"\"\"\n",
" Determine if we should abort. If so, return True otherwise return False.\"\"\"\n",
" if len(G) >= max_nodes:\n",
" return True\n",
" return False\n",
"\n",
"s = Node('s')\n",
"# x = Node('x')\n",
"z = Node('z')\n",
"distance_kwargs = {'goal_nodes': [z]}\n",
"pop_kwargs = {'max_edgeweight': 10, 'max_pathweight': 200, 'distance_callback': distance, \n",
" 'distance_kwargs': distance_kwargs}\n",
"abort_kwargs = {'max_nodes': 10}\n",
"G = {'s': s} # Add a few nodes to G. Others will be added in pop_callback if necessary.\n",
"\n",
"prev = dijkstras_using_queue(G, source_node=s, pop_callback=graph_gen_callback, \n",
" pop_kwargs=pop_kwargs, abort_func=abort_func, \n",
" abort_kwargs=abort_kwargs)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"def graph_view_callback(node, G):\n",
" print(node, node.distance)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Optimal path:\n",
"'s' 0\n",
"'y' 5\n",
"'z' 7\n",
"'t' 8\n",
"'x' 9\n"
]
}
],
"source": [
"print('Optimal path:')\n",
"prev = dijkstras_using_queue(G, source_node=s, pop_callback=graph_view_callback)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'t': 'y', 'y': 's', 'x': 't', 'z': 'y'}"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"prev"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"hide_input": false,
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.7"
},
"toc": {
"base_numbering": 1,
"nav_menu": {},
"number_sections": true,
"sideBar": true,
"skip_h1_title": false,
"title_cell": "Table of Contents",
"title_sidebar": "Contents",
"toc_cell": false,
"toc_position": {},
"toc_section_display": true,
"toc_window_display": false
},
"varInspector": {
"cols": {
"lenName": 16,
"lenType": 16,
"lenVar": 40
},
"kernels_config": {
"python": {
"delete_cmd_postfix": "",
"delete_cmd_prefix": "del ",
"library": "var_list.py",
"varRefreshCmd": "print(var_dic_list())"
},
"r": {
"delete_cmd_postfix": ") ",
"delete_cmd_prefix": "rm(",
"library": "var_list.r",
"varRefreshCmd": "cat(var_dic_list()) "
}
},
"types_to_exclude": [
"module",
"function",
"builtin_function_or_method",
"instance",
"_Feature"
],
"window_display": false
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment