pyMez.Code.DataHandlers.GraphModels module
Graph Models stores sub classes of graphs that define data translations. All edges
or the functions that define translations from one format to another
are found in pyMez.Code.DataHandlers.Translations.
Currently, the module networkx is used to display the graph.
Examples
>>from pyMez import * >>image_graph=ImageGraph() >>image_graph.set_state('png','my_png.png') >>image_graph.move_to_node('EmbeddedHtml') >>output=image_graph.data >>print output
GraphModels Example
Requirements
Help
#----------------------------------------------------------------------------- # Name: GraphModels # Purpose: To store graphs used in network translations # Author: Aric Sanders # Created: 4/6/2016 # License: MIT License #----------------------------------------------------------------------------- """ Graph Models stores sub classes of graphs that define data translations. All edges or the functions that define translations from one format to another are found in <a href="./Translations.m.html">`pyMez.Code.DataHandlers.Translations`</a>. Currently, the module networkx is used to display the graph. Examples -------- #!python >>from pyMez import * >>image_graph=ImageGraph() >>image_graph.set_state('png','my_png.png') >>image_graph.move_to_node('EmbeddedHtml') >>output=image_graph.data >>print output <h3><a href="../../../Examples/Html/GraphModels_Example.html">GraphModels Example</a></h3> Requirements ------------ + [sys](https://docs.python.org/2/library/sys.html) + [os](https://docs.python.org/2/library/os.html?highlight=os#module-os) + [networkx](http://networkx.github.io/) + [numpy](http://www.numpy.org/) + [pyMez](https://github.com/aricsanders/pyMez) Help --------------- <a href="./index.html">`pyMez.Code.DataHandlers`</a> <div> <a href="../../../pyMez_Documentation.html">Documentation Home</a> | <a href="../../index.html">API Documentation Home</a> | <a href="../../../Examples/html/Examples_Home.html">Examples Home</a> | <a href="../../../Reference_Index.html">Index</a> </div> """ #----------------------------------------------------------------------------- # Standard Imports import re import datetime import sys import os #----------------------------------------------------------------------------- # Third Party Imports sys.path.append(os.path.join(os.path.dirname( __file__ ), '..','..')) try: from Code.Utils.Alias import * METHOD_ALIASES=1 except: print("The module pyMez.Code.Utils.Alias was not found") METHOD_ALIASES=0 pass try: from Code.DataHandlers.GeneralModels import * except: print("The module pyMez.Code.DataHandlers.GeneralModels was not found," "please put it on the python path") raise ImportError try: from Code.DataHandlers.TouchstoneModels import * except: print("The module pyMez.Code.DataHandlers.TouchstoneModels was not found," "please put it on the python path") raise ImportError try: from Code.DataHandlers.Translations import * except: print("The module pyMez.Code.DataHandlers.Translations was not found or had an error," "please put it on the python path or resolve the error") raise ImportError try: import numpy as np except: print("The module numpy was not found," "please put it on the python path") raise ImportError try: import networkx except: print("The module networkx was not found," "please put it on the python path") raise ImportError #----------------------------------------------------------------------------- # Module Constants #----------------------------------------------------------------------------- # Module Functions # as an example these functions are left. #todo: Change the names def edge_1_to_2(in_string): "A Test function for an edge for a Graph" return in_string.splitlines() def edge_2_to_1(string_list): """A test function for an edge in a Graph""" return string_list_collapse(string_list) def visit_all_nodes(graph): """Visit all nodes visits each node on a graph""" nodes=graph.node_names for node in nodes: graph.move_to_node(node) def visit_and_print_all_nodes(graph): """Visits all the nodes in graph and prints graph.data after each move""" nodes=graph.node_names for node in nodes: graph.move_to_node(node) print((graph.data)) def to_node_name(node_data): """Creates a node name given an input object, does a bit of silly type selecting and name rearranging. This matches for 75% of the cases. There are a lot of user defined nodes without a clear path to generate a name. For instance the DataTableGraph node HpFile, does not save with a .hp extension so it would be auto named TxtFile if was only selected by the path name. If it is auto selected it returns StringList because it is of the format ["file_path","schema_path"] """ # we retrieve the text version of the class name class_name = node_data.__class__.__name__ node_name = class_name # now for dict and list types we want to inspect the first Element to see what it is if re.match('list', class_name): node_name = "List" try: element_class_name = node_data[0].__class__.__name__ node_name = element_class_name + node_name except: pass elif re.match('dict', class_name): node_name = "Dictionary" try: element_class_name = list(node_data.values())[0].__class__.__name__ node_name = element_class_name + node_name except: pass elif re.match('str', class_name): node_name = "String" # Now we have to check if it is an existing file name if os.path.isfile(node_data): node_name = "File" extension = "" try: if re.search("\.", node_data): extension = node_data.split(".")[-1] node_name = extension.title() + node_name except: pass elif fnmatch.fnmatch(node_data, "*.*"): node_name = "File" try: if re.search("\.", node_data): extension = node_data.split(".")[-1] node_name = extension.title() + node_name except: pass node_name = node_name.replace("str", "String").replace("dict", "Dictionary") return (node_name) def TableGraph_to_Links(table_graph, **options): """Converts a table graph to a set of download links with embedded data in them""" defaults = {"base_name": None, "nodes": ['XmlFile', 'CsvFile', 'ExcelFile', 'OdsFile', 'MatFile', 'HtmlFile', 'JsonFile'], "extensions": ['xml', 'csv', 'xlsx', 'ods', 'mat', 'html', 'json'], "mime_types": ['application/xml', 'text/plain', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', 'application/vnd.oasis.opendocument.spreadsheet', 'application/x-matlab-data', 'text/html', 'application/json']} conversion_options = {} for key, value in defaults.items(): conversion_options[key] = value for key, value in options.items(): conversion_options[key] = value if conversion_options["base_name"] is None: base_name = 'test.txt' else: base_name = conversion_options["base_name"] nodes = conversion_options["nodes"] extensions = conversion_options["extensions"] mime_types = conversion_options["mime_types"] out_links = "" for node_index, node in enumerate(nodes): table_graph.move_to_node(node) file_path = table_graph.data in_file = open(file_path, 'rb') content_string = in_file.read() link = String_to_DownloadLink(content_string, suggested_name=change_extension(base_name, extensions[node_index]), mime_type=mime_types[node_index], text=extensions[node_index]) if node_index == len(nodes) - 1: out_links = out_links + link else: out_links = out_links + link + " | " return out_links def remove_circular_paths(path): """Removes pieces of the path that just end on the same node""" # Todo: Track the error that leaves out a needed path sometimes # See http://localhost:8888/notebooks/Two_Port_Matrix_Parameters_Debug_20170105_001.ipynb edge_pattern=re.compile("edge_(?P<begin_node>\w+)_(?P<end_node>\w+)_(?P<iterator>\w+)") past_locations=[] for index,edge in enumerate(path): match=re.match(edge_pattern,edge) begin_node=match.groupdict()["begin_node"] end_node=match.groupdict()["end_node"] past_locations.append(begin_node) #print("{0} is {1}".format("past_locations",past_locations)) new_path=[] node_index=0 between_list=[False for item in past_locations] while(node_index<len(past_locations)): node=past_locations[node_index] old_path=new_path new_path=[] # if you visit a location more than one number_of_visits=past_locations.count(node) if number_of_visits>1: #print("{0} is {1}".format("node",node)) #print("{0} is {1}".format("past_locations",past_locations)) # Now find all the visits to that location equality_list=[x==node for x in past_locations] print(("{0} is {1}".format("equality_list",equality_list))) # You are intially not between visits between=False # every time you cross that node you flip between, as long as there are visit_number=0 for index,equality in enumerate(equality_list): if equality: # add one to the visit number visit_number+=1 # Flip the between truth value if it is the first or last # visits only if visit_number==1 or visit_number==number_of_visits: between=not between between_list[index]=between or between_list[index] else: between_list[index]=between or between_list[index] else: between_list[index]=between or between_list[index] #print("{0} is {1}".format("between_list",between_list)) for index,item in enumerate(between_list): if not item: new_path.append(path[index]) node_index+=1 if new_path in [[]]: new_path=path return new_path #----------------------------------------------------------------------------- # Module Classes # getting around to adding a breadth first graph solver to Graph class # modify the find_path method class Graph(object): """The Graph class creates a content graph that has as nodes different formats. As a format is added via graph.add_node() by specifying a node name and a function from an existing node into the new one, and one exiting the node. Once a series of nodes exists to enter the graph at a node use graph.set_state() the current data representing the state is in the attribute graph.data. To move among the formats use graph.move_to_node('NodeName') need to recode the find_path method using a shortest path alogrithm like [Dijkstra](https://en.wikipedia.org/wiki/Dijkstra%27s_algorithm). """ def __init__(self, **options): """Initializes the graph. The first 2 nodes and two edges forming a bijection between them are required""" defaults = {"graph_name": "Graph", "node_names": ['n1', 'n2'], "node_descriptions": ["A plain string", "A list of strings with no \\n, created with string.splitlines()"], "current_node": 'n1', "state": [1, 0], "data": "This is a test string\n it has to have multiple lines \n and many characters 34%6\n^", "edge_2_to_1": edge_2_to_1, "edge_1_to_2": edge_1_to_2 } self.options = {} for key, value in defaults.items(): self.options[key] = value for key, value in options.items(): self.options[key] = value self.elements = ['graph_name', 'node_names', 'node_descriptions', 'current_node', 'state', 'data'] for element in self.elements: self.__dict__[element] = self.options[element] self.edges = [] self.edge_matrices = [] self.state_matrix = np.matrix(self.state).T # Add the first 2 edges, required to intialize the graph properly self.display_graph = networkx.DiGraph() self.add_edge(self.node_names[0], self.node_names[1], self.options["edge_1_to_2"]) self.add_edge(self.node_names[1], self.node_names[0], self.options["edge_2_to_1"]) self.jumps = [] self.external_node_names = [] self.external_node_descriptions = [] self.display_layout = networkx.spring_layout(self.display_graph) def get_description_dictionary(self): "returns a dictionary of the form {NodeName:Node Description for all of the current nodes" dictionary = {node_name: self.node_descriptions[index] for index, node_name in enumerate(self.node_names)} return dictionary def set_state(self, node_name, node_data): """Sets the graph state to be the state specified by node_name, and node_data""" try: current_node_state_position = self.node_names.index(node_name) self.current_node = node_name self.data = node_data self.state = [0 for i in range(len(self.node_names))] self.state[current_node_state_position] = 1 self.state_matrix = np.matrix(self.state).T except: print(("Could not set the state of graph: {0}".format(self.graph_name))) raise def add_edge(self, begin_node=None, end_node=None, edge_function=None): """Adds an edge mapping one node to another, required input is begin_node (it's name) end_node, and the edge function""" # check to see if edge is defined if it is increment a number edge_match = re.compile("edge_{0}_{1}".format(begin_node, end_node)) keys = list(self.__dict__.keys()) # print keys iterator = 0 for key in keys: if re.match(edge_match, key): iterator += 1 edge_name = "edge_{0}_{1}_{2:0>3d}".format(begin_node, end_node, iterator) self.__dict__[edge_name] = edge_function self.edges.append(edge_name) edge_matrix = np.zeros((len(self.state), len(self.state))) begin_position = self.node_names.index(begin_node) end_position = self.node_names.index(end_node) edge_matrix[end_position][begin_position] = 1 edge_matrix = np.matrix(edge_matrix) self.edge_matrices.append(edge_matrix) self.display_graph.add_edge(begin_node, end_node) self.display_layout = networkx.spring_layout(self.display_graph) def add_jump(self, begin_node=None, end_node=None, jump_function=None): """Adds a jump mapping one internal node to an external node, required input is begin_node (it's name) end_node, and the edge function""" # check to see if edge is defined if it is increment a number jump_match = re.compile("jump_{0}_{1}".format(begin_node, end_node)) keys = list(self.__dict__.keys()) # print keys iterator = 0 for key in keys: if re.match(jump_match, key): iterator += 1 jump_name = "jump_{0}_{1}_{2:0>3d}".format(begin_node, end_node, iterator) self.__dict__[jump_name] = jump_function self.jumps.append(jump_name) self.display_graph.add_edge(begin_node, end_node) self.display_layout = networkx.spring_layout(self.display_graph) def move_to(self, path, **options): """Changes the state of the graph by moving along the path specified""" defaults = {"debug": False, "verbose": False} move_options = {} for key, value in defaults.items(): move_options[key] = value for key, value in options.items(): move_options[key] = value if move_options["debug"]: print(path) for index, edge in enumerate(path): # print edge edge_pattern = 'edge_(?P<begin_node>\w+)_(?P<end_node>\w+)_(?P<iterator>\w+)' match = re.match(edge_pattern, edge) begin_node = match.groupdict()['begin_node'] end_node = match.groupdict()['end_node'] if move_options["verbose"]: print(("moving {0} -> {1}".format(begin_node, end_node))) # print self.data self.data = self.__dict__[edge](self.data) # print self.data self.current_node = match.groupdict()['end_node'] self.state = [0 for i in range(len(self.node_names))] position = self.node_names.index(self.current_node) self.state[position] = 1 self.state_matrix = np.matrix(self.state).T # print self.state # print self.current_node def virtual_move_to(self, path): """virtual_move_to simulates moving but does not change the state of the graph""" # print path temp_state = self.state temp_data = self.data temp_current_node = self.current_node temp_node_names = self.node_names for index, edge in enumerate(path): # print edge edge_pattern = 'edge_(?P<begin_node>\w+)_(?P<end_node>\w+)_(?P<iterator>\w+)' match = re.match(edge_pattern, edge) begin_node = match.groupdict()['begin_node'] end_node = match.groupdict()['end_node'] # print("moving {0} -> {1}".format(begin_node,end_node)) # print self.data temp_data = self.__dict__[edge](temp_data) # print self.data temp_current_node = match.groupdict()['end_node'] temp_state = [0 for i in range(len(temp_node_names))] position = temp_node_names.index(temp_current_node) temp_state[position] = 1 # print temp_state # print self.state # print self.current_node def __str__(self): return str(self.data) def add_node(self, node_name, edge_into_node_begin, edge_into_node_function, edge_out_node_end, edge_out_node_function, node_description=None): """Adds a node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new node, a reference to an exiting node and the function mapping the new node to the exiting node.""" # first check if node into and out of node is good self.node_names.append(node_name) self.state.append(0) self.state_matrix = np.matrix(self.state).T for index, matrix in enumerate(self.edge_matrices): pad_row = np.zeros((1, len(matrix))) new_matrix = np.concatenate((matrix, pad_row), axis=0) pad_column = np.zeros((1, len(self.node_names))) new_matrix = np.concatenate((new_matrix, pad_column.T), axis=1) # print("New matrix is :\n{0}".format(new_matrix)) self.edge_matrices[index] = new_matrix self.add_edge(begin_node=node_name, end_node=edge_out_node_end, edge_function=edge_out_node_function) self.add_edge(begin_node=edge_into_node_begin, end_node=node_name, edge_function=edge_into_node_function) if node_description: self.node_descriptions.append(node_description) self.display_graph.add_node(node_name) self.display_graph.add_edge(node_name, edge_out_node_end) self.display_graph.add_edge(edge_into_node_begin, node_name) self.display_layout = networkx.spring_layout(self.display_graph) def add_external_node(self, external_node_name, jump_into_node_begin, jump_into_node_function, external_node_description=None): """Adds an external node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new external node""" # first check if node into and out of node is good self.external_node_names.append(external_node_name) self.add_jump(begin_node=jump_into_node_begin, end_node=external_node_name, jump_function=jump_into_node_function) if external_node_description: self.external_node_descriptions.append(external_node_description) self.display_graph.add_node(external_node_name) self.display_graph.add_edge(jump_into_node_begin, external_node_name) self.display_layout = networkx.spring_layout(self.display_graph) def jump_to_external_node(self, external_node_name, **options): """Returns the result of the jump, the graph is left in the node that is the begining of the jump""" end_node = external_node_name jump_pattern = 'jump_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(end_node) for jump in self.jumps[:]: jump_match = re.match(jump_pattern, jump, re.IGNORECASE) if jump_match: jump_to_use = jump begin_node = jump_match.groupdict()["begin_node"] self.move_to_node(begin_node) return self.__dict__[jump_to_use](self.data, **options) def path_length(self, path, num_repeats=10): """Determines the length of a given path, currently the metric is based on the time to move to.""" begin_time = datetime.datetime.now() # num_repeats=100 for i in range(num_repeats): self.virtual_move_to(path) end_time = datetime.datetime.now() delta_t = end_time - begin_time path_length = delta_t.total_seconds() / float(num_repeats) if path_length == 0.0: print("Warning the path length is less than 1 microsecond," "make sure num_repeats is high enough to measure it.") return path_length def is_path_valid(self, path): """Returns True if the path is valid from the current node position or False otherwise""" null_state = [0 for i in range(len(self.node_names))] null_state_matrix = np.matrix(null_state).T new_state = np.matrix(self.state).T for index, edge in enumerate(path): # print index # print edge edge_position = self.edges.index(edge) move_matrix = self.edge_matrices[edge_position] # print move_matrix new_state = move_matrix * new_state if new_state.any() == null_state_matrix.any(): # print new_state # print null_state_matrix return False return True def get_entering_nodes(self, node): """Returns all nodes that have an edge that enter the specificed node""" enter_edge_pattern = re.compile('edge_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(node)) enter_nodes = [] for index, edge in enumerate(self.edges): enter_match = re.match(enter_edge_pattern, edge) if enter_match: enter_node = enter_match.groupdict()['begin_node'] enter_nodes.append(enter_node) return enter_nodes def get_entering_edges(self, node): """Returns all edges that enter the specificed node""" enter_edge_pattern = re.compile('edge_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(node)) enter_edges = [] for index, edge in enumerate(self.edges): if re.match(enter_edge_pattern, edge): enter_edges.append(edge) return enter_edges def get_exiting_edges(self, node): """Returns all edges that exit the specificed node""" exit_edge_pattern = re.compile('edge_{0}_(?P<end_node>\w+)_(?P<iterator>\w+)'.format(node)) exit_edges = [] for index, edge in enumerate(self.edges): if re.match(exit_edge_pattern, edge): exit_edges.append(edge) return exit_edges def get_exiting_nodes(self, node): """Returns all nodes that have an edge leaving the specificed node""" exit_edge_pattern = re.compile('edge_{0}_(?P<end_node>\w+)_(?P<iterator>\w+)'.format(node)) exit_nodes = [] for index, edge in enumerate(self.edges): exit_match = re.match(exit_edge_pattern, edge) if exit_match: exit_node = exit_match.groupdict()['end_node'] exit_nodes.append(exit_node) return exit_nodes def get_path(self, first_node, last_node, **options): """Returns the first path found between first node and last node, uses a breadth first search algorithm""" defaults = {"debug": False, "method": "BreathFirst"} self.get_path_options = {} for key, value in defaults.items(): self.get_path_options[key] = value for key, value in options.items(): self.get_path_options[key] = value unvisited_nodes = self.node_names[:] unvisited_nodes.remove(first_node) visited_nodes = [first_node] node_history = [] edge_history = [] path_queue = [] possible_paths = [] queue = [] current_edge = [] queue.append(first_node) path = {first_node: []} while queue: # first remove the current_node = queue.pop(0) if path_queue != []: current_edge = path_queue.pop(0) edge_history.append(current_edge) node_history.append(current_node) if self.get_path_options["debug"]: print(("current_node is {0}".format(current_node))) print(("current_edge is {0}".format(current_edge))) # if this node is the destination exit returning the path if current_node == last_node: if self.get_path_options["debug"]: print(("Node path was found to be {0}".format(node_path))) print(("path was found to be {0}".format(edge_path))) print(("{0} is {1}".format("path", path))) return path[last_node][::-1] adjacent_nodes = self.get_exiting_nodes(current_node) adjacent_paths = self.get_exiting_edges(current_node) if self.get_path_options["debug"]: print(("{0} are {1}".format("adjacent_nodes", adjacent_nodes))) print(("{0} are {1}".format("adjacent_paths", adjacent_paths))) current_history = edge_history for node_index, node in enumerate(adjacent_nodes): if node not in visited_nodes: queue.append(node) path_queue.append(adjacent_paths[node_index]) visited_nodes.append(node) path[node] = [adjacent_paths[node_index]] + path[current_node] path[node] # possible_paths.append(current_path.append(node)) if self.get_path_options["debug"]: print(("{0} is {1}".format("path_queue", path_queue))) def move_to_node(self, node): """Moves from current_node to the specified node""" path = self.get_path(self.current_node, node) self.move_to(path) def check_closed_path(self): """Checks that data is not changed for the first closed path found. Returns True if data==data after moving around the closed path, False otherwise. Starting point is current_node """ temp_data = self.data path = self.get_path(self.current_node, self.current_node) if self.is_path_valid(path): pass else: print("Path is not valid, graph definition is broken") raise out = temp_data == self.data out_list = [self.current_node, path, out] print(("The assertion that the data remains unchanged,\n" "for node {0} following path {1} is {2}".format(*out_list))) return out def is_graph_isomorphic(self): """Returns True if all nodes have closed paths that preserve the data, False otherwise""" out = True for node in self.node_names: self.move_to_node(node) if not self.check_closed_path: out = False return out def show(self, **options): """Shows the graph using matplotlib and networkx""" # Should be seperated to allow for fixed presentation? defaults = {"descriptions": False, "edge_descriptions": False, "save_plot": False, "path": None, "active_node": True, "directory": None, "specific_descriptor": self.graph_name.replace(" ", "_"), "general_descriptor": "plot", "file_name": None, "arrows": True, "node_size": 1000, "font_size": 10, "fix_layout": True} show_options = {} for key, value in defaults.items(): show_options[key] = value for key, value in options.items(): show_options[key] = value if show_options["directory"] is None: show_options["directory"] = os.getcwd() if show_options["active_node"]: node_colors = [] for node in self.display_graph.nodes(): if node == self.current_node: node_colors.append('b') else: if node in self.node_names: node_colors.append('r') elif node in self.external_node_names: node_colors.append('g') else: node_colors = ['r' for node in self.node_names] + ['g' for node in self.node_names] # print("{0} is {1}".format('node_colors',node_colors)) if show_options["descriptions"]: node_labels = {node: self.node_descriptions[index] for index, node in enumerate(self.node_names)} if self.external_node_names: for index, node in enumerate(self.external_node_names): node_labels[node] = self.external_node_descriptions[index] networkx.draw_networkx(self.display_graph, arrows=show_options["arrows"], labels=node_labels, node_color=node_colors, node_size=show_options["node_size"], font_size=show_options["font_size"], pos=self.display_layout) # print("{0} is {1}".format('node_labels',node_labels)) else: networkx.draw_networkx(self.display_graph, arrows=show_options["arrows"], node_color=node_colors, node_size=show_options["node_size"], font_size=show_options["font_size"], pos=self.display_layout) plt.axis('off') plt.suptitle(self.options["graph_name"]) if show_options["file_name"] is None: file_name = auto_name(specific_descriptor=show_options["specific_descriptor"], general_descriptor=show_options["general_descriptor"], directory=show_options["directory"], extension='png', padding=3) else: file_name = show_options["file_name"] if show_options["save_plot"]: # print file_name if show_options["path"]: plt.savefig(show_options["path"]) else: plt.savefig(os.path.join(show_options["directory"], file_name)) else: plt.show() fig = plt.gcf() return fig class StringGraph(Graph): """String Graph is a graph relating different string forms""" def __init__(self,**options): """Intializes the StringGraph Class by defining nodes and edges""" defaults={"graph_name":"StringGraph", "node_names":['String','StringList'], "node_descriptions":["A plain string", "A list of strings with no \\n, created with string.splitlines()"], "current_node":'String', "state":[1,0], "data":"This is a test string\n it has to have multiple lines \n and many characters 34%6\n^", "edge_2_to_1":edge_2_to_1, "edge_1_to_2":edge_1_to_2 } self.options={} for key,value in defaults.items(): self.options[key]=value for key,value in options.items(): self.options[key]=value Graph.__init__(self,**self.options) self.add_node("File","String",String_to_File,"String",File_to_String,node_description="Plain File") self.add_node("CStringIo","String",String_to_CStringIo,"String",CStringIo_to_String,node_description="C File Like Object") self.add_node("StringIo","String",String_to_StringIo,"String",StringIo_to_String,node_description="File Like Object") self.add_edge(begin_node="StringList",end_node="File",edge_function=StringList_to_File) # Changed from ColumnModeledGraph to TableGraph 12/14/2016 by AWS class TableGraph(Graph): """Class that transforms column modeled data (table) from one format to another, use set_state to initialize to your data. #!python defaults={"graph_name":"Table Graph", "node_names":['DataFrame','AsciiDataTable'], "node_descriptions":["Pandas Data Frame","AsciiDataTable"], "current_node":'DataFrame', "state":[1,0], "data":pandas.DataFrame([[1,2,3],[3,4,5]],columns=["a","b","c"]), "edge_2_to_1":AsciiDataTable_to_DataFrame, "edge_1_to_2":DataFrame_to_AsciiDataTable} """ def __init__(self,**options): defaults={"graph_name":"Table Graph", "node_names":['DataFrame','AsciiDataTable'], "node_descriptions":["Pandas Data Frame","AsciiDataTable"], "current_node":'DataFrame', "state":[1,0], "data":pandas.DataFrame([[1,2,3],[3,4,5]],columns=["a","b","c"]), "edge_2_to_1":AsciiDataTable_to_DataFrame, "edge_1_to_2":DataFrame_to_AsciiDataTable} self.options={} for key,value in defaults.items(): self.options[key]=value for key,value in options.items(): self.options[key]=value Graph.__init__(self,**self.options) self.add_node("HdfFile","DataFrame",DataFrame_to_HdfFile, "DataFrame",HdfFile_to_DataFrame, node_description="HDF File") self.add_node("XmlDataTable","AsciiDataTable",AsciiDataTable_to_XmlDataTable, "AsciiDataTable",XmlDataTable_to_AsciiDataTable, node_description="XML Data Table") # Need to add XML File and Html File using save and save_HTML() self.add_node("ExcelFile","DataFrame",DataFrame_to_ExcelFile, "DataFrame",ExcelFile_to_DataFrame, node_description="Excel File") self.add_node("OdsFile","ExcelFile",ExcelFile_to_OdsFile, "ExcelFile",OdsFile_to_ExcelFile,"Open Office Spreadsheet") self.add_node("HtmlString","DataFrame",DataFrame_to_HtmlString, "DataFrame",HtmlString_to_DataFrame, node_description="HTML String") # Note a lot of the pandas reading and writing cause float64 round off errors # applymap(lambda x: np.around(x,10) any all float fields will fix this # also the column names move about in order self.add_node("JsonFile","DataFrame",DataFrame_to_JsonFile, "DataFrame",JsonFile_to_DataFrame, node_description="JSON File") self.add_node("JsonString","DataFrame",DataFrame_to_JsonString, "DataFrame",JsonString_to_DataFrame, node_description="JSON String") self.add_node("CsvFile","DataFrame",DataFrame_to_CsvFile, "DataFrame",CsvFile_to_DataFrame, node_description="CSV File") self.add_node("MatFile","AsciiDataTable",AsciiTable_to_MatFile, "AsciiDataTable",MatFile_to_AsciiTable, node_description="Matlab File") self.add_node("XmlFile","XmlDataTable",XmlDataTable_to_XmlFile, "XmlDataTable",XmlFile_to_XmlDataTable, node_description="XML DataTable Saved As a File") self.add_node("HtmlFile","HtmlString",HtmlString_to_HtmlFile, "HtmlString",HtmlFile_to_HtmlString, node_description="HTML File") self.add_edge("DataFrame","HtmlFile",DataFrame_to_HtmlFile) self.add_edge("JsonFile","XmlDataTable",JsonFile_to_XmlDataTable) self.add_external_node("XsltResultString","XmlDataTable",XmlBase_to_XsltResultString, external_node_description="XSLT Results String") self.add_external_node("XsltResultFile","XmlDataTable",XmlBase_to_XsltResultFile, external_node_description="XSLT Results File") class ImageGraph(Graph): """A transformation graph for images node types are image formats and external nodes are common image processing functions #!python defaults={"graph_name":"Image Graph", "node_names":['Image','png'], "node_descriptions":["PIL Image","png"], "current_node":'Image', "state":[1,0], "data":PIL.Image.open(os.path.join(TESTS_DIRECTORY,'test.png')), "edge_2_to_1":File_to_Image, "edge_1_to_2":lambda x: Image_to_FileType(x,file_path="test",extension="png")} """ def __init__(self,**options): defaults={"graph_name":"Image Graph", "node_names":['Image','Png'], "node_descriptions":["PIL Image","Png"], "current_node":'Image', "state":[1,0], "data":PIL.Image.open(os.path.join(TESTS_DIRECTORY,'test.png')), "edge_2_to_1":File_to_Image, "edge_1_to_2":lambda x: Image_to_FileType(x,file_path="test",extension="png")} self.options={} for key,value in defaults.items(): self.options[key]=value for key,value in options.items(): self.options[key]=value Graph.__init__(self,**self.options) self.add_node("Jpg","Image",lambda x: Image_to_FileType(x,file_path="test",extension="jpg"), "Image",File_to_Image,node_description="Jpg File") self.add_node("Tiff","Image",lambda x: Image_to_FileType(x,file_path="test",extension="tiff"), "Image",File_to_Image,node_description="Tif File") self.add_node("Gif","Image",lambda x: Image_to_FileType(x,file_path="test",extension="gif"), "Image",File_to_Image,node_description="Gif File") self.add_node("Bmp","Image",lambda x: Image_to_FileType(x,file_path="test",extension="bmp"), "Image",File_to_Image,node_description="BMP File") self.add_node("Base64","Png",PngFile_to_Base64, "Png",Base64_to_PngFile,node_description="Base 64 PNG") self.add_node("EmbeddedHtml","Base64",Base64Png_to_EmbeddedHtmlString, "Base64",EmbeddedHtmlString_to_Base64Png,node_description="Embedded HTML of PNG") self.add_node("Ndarray","Png",PngFile_to_Ndarray, "Png",Ndarray_to_PngFile,node_description="Numpy Array") self.add_node("MatplotlibFigure","Ndarray",Ndarray_to_MatplotlibFigure, "Png",MatplotlibFigure_to_PngFile,node_description="MatplotlibFigure") self.add_external_node("Thumbnail","Image",Image_to_ThumbnailFile,external_node_description="JPEG Thumbnail") self.add_external_node("Matplotlib","Ndarray",Ndarray_to_Matplotlib, external_node_description="Matplotlib Plot") class MetadataGraph(Graph): """Metadata Graph is a graph representing the content of key,value metadata""" def __init__(self,**options): """Intializes the metadata graph class""" defaults={"graph_name":"Metadata Graph", "node_names":['Dictionary','JsonString'], "node_descriptions":["Python Dictionary","Json string"], "current_node":'Dictionary', "state":[1,0], "data":{"a":"First","b":"Second"}, "edge_2_to_1":JsonString_to_Dictionary, "edge_1_to_2":Dictionary_to_JsonString} self.options={} for key,value in defaults.items(): self.options[key]=value for key,value in options.items(): self.options[key]=value Graph.__init__(self,**self.options) self.add_node("JsonFile","JsonString",JsonString_to_JsonFile, "JsonString",JsonFile_to_JsonString,node_description="JSON File") self.add_node("XmlString","Dictionary",Dictionary_to_XmlString, "Dictionary",XmlString_to_Dictionary,node_description="XML string") self.add_node("HtmlMetaString","Dictionary",Dictionary_to_HtmlMetaString, "Dictionary",HtmlMetaString_to_Dictionary,node_description="HTML meta tags") self.add_node("XmlTupleString","Dictionary",Dictionary_to_XmlTupleString, "Dictionary",XmlTupleString_to_Dictionary,node_description="Tuple Line") self.add_node("PickleFile","Dictionary",Dictionary_to_PickleFile, "Dictionary",PickleFile_to_Dictionary,node_description="Pickled File") self.add_node("ListList","Dictionary",Dictionary_to_ListList, "Dictionary",ListList_to_Dictionary,node_description="List of lists") self.add_node("HeaderList","Dictionary",Dictionary_to_HeaderList, "Dictionary",HeaderList_to_Dictionary,node_description="Header List") self.add_node("DataFrame","Dictionary",Dictionary_to_DataFrame, "Dictionary",DataFrame_to_Dictionary,node_description="Pandas DataFrame") self.add_node("AsciiDataTable","DataFrame",DataFrame_to_AsciiDataTable, "DataFrame",AsciiDataTable_to_DataFrame,node_description="AsciiDataTable") self.add_node("MatFile","AsciiDataTable",AsciiTable_to_MatFile, "AsciiDataTable",MatFile_to_AsciiDataTableKeyValue,node_description="Matlab") self.add_node("ExcelFile","DataFrame",DataFrame_to_ExcelFile, "DataFrame",ExcelFile_to_DataFrame,node_description="excel") self.add_node("HdfFile","DataFrame",DataFrame_to_HdfFile, "DataFrame",HdfFile_to_DataFrame,node_description="hdf file") self.add_node("CsvFile","DataFrame",DataFrame_to_CsvFile, "DataFrame",CsvFile_to_DataFrame,node_description="CSV File") self.add_node("HtmlFile","DataFrame",DataFrame_to_HtmlFile, "DataFrame",HtmlFile_to_DataFrame,node_description="HTML Table File") self.add_node("HtmlTableString","HtmlFile",HtmlFile_to_HtmlString, "HtmlFile",HtmlString_to_HtmlFile,node_description="HTML Table String") class TwoPortParameterGraph(Graph): """TwoPortParamterGraph is a content graph for two-port parameters, it transforms between S,T,Y,Z,ABCD and H parameters and matrix versions. #!python defaults={"graph_name":"Two Port Parameter Graph", "node_names":["SFrequencyList",'SFrequencyMatrixList'], "node_descriptions":["S Parameters","S Parameters in a Matrix"], "current_node":'SFrequencyList', "state":[1,0], "data":[[1.0,.9,.436,.436,.9]], "edge_2_to_1":FrequencyMatrixList_to_FrequencyList, "edge_1_to_2":FrequencyList_to_FrequencyMatrixList, "frequency_units":"GHz", "Z01":50, "Z02":50 } """ def __init__(self,**options): defaults={"graph_name":"Two Port Parameter Graph", "node_names":["SFrequencyList",'SFrequencyMatrixList'], "node_descriptions":["S Parameters","S Parameters in a Matrix"], "current_node":'SFrequencyList', "state":[1,0], "data":[[1.0,.9,.436,.436,.9]], "edge_2_to_1":FrequencyMatrixList_to_FrequencyList, "edge_1_to_2":FrequencyList_to_FrequencyMatrixList, "frequency_units":"GHz", "Z01":50, "Z02":50 } graph_options={} for key,value in defaults.items(): graph_options[key]=value for key,value in options.items(): graph_options[key]=value Graph.__init__(self,**graph_options) self.add_node("TFrequencyMatrixList", "SFrequencyMatrixList",SFrequencyMatrixList_to_TFrequencyMatrixList, "SFrequencyMatrixList",TFrequencyMatrixList_to_SFrequencyMatrixList, "T Parameters in a Matrix") self.add_node("TFrequencyList", "TFrequencyMatrixList",FrequencyMatrixList_to_FrequencyList, "TFrequencyMatrixList",FrequencyList_to_FrequencyMatrixList, "T Parameters") self.add_node("ZFrequencyList", "SFrequencyList",SFrequencyList_to_ZFrequencyList, "TFrequencyList",ZFrequencyList_to_TFrequencyList, "Z Parameters") self.add_node("ZFrequencyMatrixList", "ZFrequencyList",FrequencyList_to_FrequencyMatrixList, "ZFrequencyList",FrequencyMatrixList_to_FrequencyList, "Z Parameters in a matrix") self.add_node("ABCDFrequencyList", "ZFrequencyList",ZFrequencyList_to_ABCDFrequencyList, "ZFrequencyList",ABCDFrequencyList_to_ZFrequencyList, "ABCD Parameters") self.add_node("ABCDFrequencyMatrixList", "ABCDFrequencyList",FrequencyList_to_FrequencyMatrixList, "ABCDFrequencyList",FrequencyMatrixList_to_FrequencyList, "ABCD Parameters in a matrix") self.add_node("HFrequencyList", "ABCDFrequencyList",ABCDFrequencyList_to_HFrequencyList, "ZFrequencyList",HFrequencyList_to_ZFrequencyList, "h Parameters") self.add_node("HFrequencyMatrixList", "HFrequencyList",FrequencyList_to_FrequencyMatrixList, "HFrequencyList",FrequencyMatrixList_to_FrequencyList, "H Parameters in a matrix") self.add_node("YFrequencyList", "ABCDFrequencyList",ABCDFrequencyList_to_YFrequencyList, "HFrequencyList",YFrequencyList_to_HFrequencyList, "Y Parameters") self.add_node("YFrequencyMatrixList", "YFrequencyList",FrequencyList_to_FrequencyMatrixList, "YFrequencyList",FrequencyMatrixList_to_FrequencyList, "Y Parameters in a matrix") self.add_edge(begin_node="ZFrequencyMatrixList", end_node="YFrequencyMatrixList", edge_function=ZFrequencyMatrixList_to_YFrequencyMatrixList) self.add_edge(begin_node="SFrequencyMatrixList", end_node="ZFrequencyMatrixList", edge_function=SFrequencyMatrixList_to_ZFrequencyMatrixList) self.add_edge(begin_node="ZFrequencyMatrixList", end_node="TFrequencyMatrixList", edge_function=ZFrequencyMatrixList_to_TFrequencyMatrixList) self.add_edge(begin_node="ABCDFrequencyList", end_node="SFrequencyList", edge_function=ABCDFrequencyList_to_SFrequencyList) class DataTableGraph(Graph): """ Class that transforms a row modelled header and metadata to several different data types #!python defaults={"graph_name":"Data Table Graph", "node_names":['DataFrameDictionary','AsciiDataTable'], "node_descriptions":["Pandas Data Frame Dictionary","AsciiDataTable"], "current_node":'DataFrameDictionary', "state":[1,0], "data":AsciiDataTable_to_DataFrameDictionary(TwoPortRawModel(os.path.join(TESTS_DIRECTORY,'TestFileTwoPortRaw.txt'))), "edge_2_to_1":AsciiDataTable_to_DataFrameDictionary, "edge_1_to_2":DataFrameDictionary_to_AsciiDataTable } """ def __init__(self,**options): defaults={"graph_name":"Data Table Graph", "node_names":['DataFrameDictionary','AsciiDataTable'], "node_descriptions":["Pandas Data Frame Dictionary","AsciiDataTable"], "current_node":'DataFrameDictionary', "state":[1,0], "data":AsciiDataTable_to_DataFrameDictionary(TwoPortRawModel(os.path.join(TESTS_DIRECTORY,'TestFileTwoPortRaw.txt'))), "edge_2_to_1":AsciiDataTable_to_DataFrameDictionary, "edge_1_to_2":DataFrameDictionary_to_AsciiDataTable } graph_options={} for key,value in defaults.items(): graph_options[key]=value for key,value in options.items(): graph_options[key]=value Graph.__init__(self, **graph_options) self.add_node("ExcelFile", "DataFrameDictionary", DataFrameDictionary_to_ExcelFile, "DataFrameDictionary", ExcelFile_to_DataFrameDictionary, node_description="Excel Workbook") self.add_node("HdfFile", "DataFrameDictionary", DataFrameDictionary_to_HdfFile, "DataFrameDictionary", HdfFile_to_DataFrameDictionary, node_description="HD5 File") self.add_node("CsvFile", "AsciiDataTable", AsciiDataTable_to_CsvFile, "AsciiDataTable", File_to_AsciiDataTable, node_description="CSV File") self.add_node("HpFile", "AsciiDataTable", AsciiDataTable_to_HpFile, "AsciiDataTable", File_to_AsciiDataTable, node_description="hp format File") self.add_external_node(external_node_name="XMLDataTable", jump_into_node_begin="AsciiDataTable", jump_into_node_function=AsciiDataTable_to_XmlDataTable, external_node_description="XMLDataTable") #----------------------------------------------------------------------------- # Module Scripts #TODO: Add test_Graph script currently lives in jupyter-notebooks #----------------------------------------------------------------------------- # Module Runner if __name__ == '__main__': pass
Functions
def TableGraph_to_Links(
table_graph, **options)
Converts a table graph to a set of download links with embedded data in them
def TableGraph_to_Links(table_graph, **options): """Converts a table graph to a set of download links with embedded data in them""" defaults = {"base_name": None, "nodes": ['XmlFile', 'CsvFile', 'ExcelFile', 'OdsFile', 'MatFile', 'HtmlFile', 'JsonFile'], "extensions": ['xml', 'csv', 'xlsx', 'ods', 'mat', 'html', 'json'], "mime_types": ['application/xml', 'text/plain', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', 'application/vnd.oasis.opendocument.spreadsheet', 'application/x-matlab-data', 'text/html', 'application/json']} conversion_options = {} for key, value in defaults.items(): conversion_options[key] = value for key, value in options.items(): conversion_options[key] = value if conversion_options["base_name"] is None: base_name = 'test.txt' else: base_name = conversion_options["base_name"] nodes = conversion_options["nodes"] extensions = conversion_options["extensions"] mime_types = conversion_options["mime_types"] out_links = "" for node_index, node in enumerate(nodes): table_graph.move_to_node(node) file_path = table_graph.data in_file = open(file_path, 'rb') content_string = in_file.read() link = String_to_DownloadLink(content_string, suggested_name=change_extension(base_name, extensions[node_index]), mime_type=mime_types[node_index], text=extensions[node_index]) if node_index == len(nodes) - 1: out_links = out_links + link else: out_links = out_links + link + " | " return out_links
def edge_1_to_2(
in_string)
A Test function for an edge for a Graph
def edge_1_to_2(in_string): "A Test function for an edge for a Graph" return in_string.splitlines()
def edge_2_to_1(
string_list)
A test function for an edge in a Graph
def edge_2_to_1(string_list): """A test function for an edge in a Graph""" return string_list_collapse(string_list)
def remove_circular_paths(
path)
Removes pieces of the path that just end on the same node
def remove_circular_paths(path): """Removes pieces of the path that just end on the same node""" # Todo: Track the error that leaves out a needed path sometimes # See http://localhost:8888/notebooks/Two_Port_Matrix_Parameters_Debug_20170105_001.ipynb edge_pattern=re.compile("edge_(?P<begin_node>\w+)_(?P<end_node>\w+)_(?P<iterator>\w+)") past_locations=[] for index,edge in enumerate(path): match=re.match(edge_pattern,edge) begin_node=match.groupdict()["begin_node"] end_node=match.groupdict()["end_node"] past_locations.append(begin_node) #print("{0} is {1}".format("past_locations",past_locations)) new_path=[] node_index=0 between_list=[False for item in past_locations] while(node_index<len(past_locations)): node=past_locations[node_index] old_path=new_path new_path=[] # if you visit a location more than one number_of_visits=past_locations.count(node) if number_of_visits>1: #print("{0} is {1}".format("node",node)) #print("{0} is {1}".format("past_locations",past_locations)) # Now find all the visits to that location equality_list=[x==node for x in past_locations] print(("{0} is {1}".format("equality_list",equality_list))) # You are intially not between visits between=False # every time you cross that node you flip between, as long as there are visit_number=0 for index,equality in enumerate(equality_list): if equality: # add one to the visit number visit_number+=1 # Flip the between truth value if it is the first or last # visits only if visit_number==1 or visit_number==number_of_visits: between=not between between_list[index]=between or between_list[index] else: between_list[index]=between or between_list[index] else: between_list[index]=between or between_list[index] #print("{0} is {1}".format("between_list",between_list)) for index,item in enumerate(between_list): if not item: new_path.append(path[index]) node_index+=1 if new_path in [[]]: new_path=path return new_path
def to_node_name(
node_data)
Creates a node name given an input object, does a bit of silly type selecting and name rearranging. This matches for 75% of the cases. There are a lot of user defined nodes without a clear path to generate a name. For instance the DataTableGraph node HpFile, does not save with a .hp extension so it would be auto named TxtFile if was only selected by the path name. If it is auto selected it returns StringList because it is of the format ["file_path","schema_path"]
def to_node_name(node_data): """Creates a node name given an input object, does a bit of silly type selecting and name rearranging. This matches for 75% of the cases. There are a lot of user defined nodes without a clear path to generate a name. For instance the DataTableGraph node HpFile, does not save with a .hp extension so it would be auto named TxtFile if was only selected by the path name. If it is auto selected it returns StringList because it is of the format ["file_path","schema_path"] """ # we retrieve the text version of the class name class_name = node_data.__class__.__name__ node_name = class_name # now for dict and list types we want to inspect the first Element to see what it is if re.match('list', class_name): node_name = "List" try: element_class_name = node_data[0].__class__.__name__ node_name = element_class_name + node_name except: pass elif re.match('dict', class_name): node_name = "Dictionary" try: element_class_name = list(node_data.values())[0].__class__.__name__ node_name = element_class_name + node_name except: pass elif re.match('str', class_name): node_name = "String" # Now we have to check if it is an existing file name if os.path.isfile(node_data): node_name = "File" extension = "" try: if re.search("\.", node_data): extension = node_data.split(".")[-1] node_name = extension.title() + node_name except: pass elif fnmatch.fnmatch(node_data, "*.*"): node_name = "File" try: if re.search("\.", node_data): extension = node_data.split(".")[-1] node_name = extension.title() + node_name except: pass node_name = node_name.replace("str", "String").replace("dict", "Dictionary") return (node_name)
def visit_all_nodes(
graph)
Visit all nodes visits each node on a graph
def visit_all_nodes(graph): """Visit all nodes visits each node on a graph""" nodes=graph.node_names for node in nodes: graph.move_to_node(node)
def visit_and_print_all_nodes(
graph)
Visits all the nodes in graph and prints graph.data after each move
def visit_and_print_all_nodes(graph): """Visits all the nodes in graph and prints graph.data after each move""" nodes=graph.node_names for node in nodes: graph.move_to_node(node) print((graph.data))
Classes
class DataTableGraph
Class that transforms a row modelled header and metadata to several different data types
!python
defaults={"graph_name":"Data Table Graph", "node_names":['DataFrameDictionary','AsciiDataTable'], "node_descriptions":["Pandas Data Frame Dictionary","AsciiDataTable"], "current_node":'DataFrameDictionary', "state":[1,0], "data":AsciiDataTable_to_DataFrameDictionary(TwoPortRawModel(os.path.join(TESTS_DIRECTORY,'TestFileTwoPortRaw.txt'))), "edge_2_to_1":AsciiDataTable_to_DataFrameDictionary, "edge_1_to_2":DataFrameDictionary_to_AsciiDataTable }
class DataTableGraph(Graph): """ Class that transforms a row modelled header and metadata to several different data types #!python defaults={"graph_name":"Data Table Graph", "node_names":['DataFrameDictionary','AsciiDataTable'], "node_descriptions":["Pandas Data Frame Dictionary","AsciiDataTable"], "current_node":'DataFrameDictionary', "state":[1,0], "data":AsciiDataTable_to_DataFrameDictionary(TwoPortRawModel(os.path.join(TESTS_DIRECTORY,'TestFileTwoPortRaw.txt'))), "edge_2_to_1":AsciiDataTable_to_DataFrameDictionary, "edge_1_to_2":DataFrameDictionary_to_AsciiDataTable } """ def __init__(self,**options): defaults={"graph_name":"Data Table Graph", "node_names":['DataFrameDictionary','AsciiDataTable'], "node_descriptions":["Pandas Data Frame Dictionary","AsciiDataTable"], "current_node":'DataFrameDictionary', "state":[1,0], "data":AsciiDataTable_to_DataFrameDictionary(TwoPortRawModel(os.path.join(TESTS_DIRECTORY,'TestFileTwoPortRaw.txt'))), "edge_2_to_1":AsciiDataTable_to_DataFrameDictionary, "edge_1_to_2":DataFrameDictionary_to_AsciiDataTable } graph_options={} for key,value in defaults.items(): graph_options[key]=value for key,value in options.items(): graph_options[key]=value Graph.__init__(self, **graph_options) self.add_node("ExcelFile", "DataFrameDictionary", DataFrameDictionary_to_ExcelFile, "DataFrameDictionary", ExcelFile_to_DataFrameDictionary, node_description="Excel Workbook") self.add_node("HdfFile", "DataFrameDictionary", DataFrameDictionary_to_HdfFile, "DataFrameDictionary", HdfFile_to_DataFrameDictionary, node_description="HD5 File") self.add_node("CsvFile", "AsciiDataTable", AsciiDataTable_to_CsvFile, "AsciiDataTable", File_to_AsciiDataTable, node_description="CSV File") self.add_node("HpFile", "AsciiDataTable", AsciiDataTable_to_HpFile, "AsciiDataTable", File_to_AsciiDataTable, node_description="hp format File") self.add_external_node(external_node_name="XMLDataTable", jump_into_node_begin="AsciiDataTable", jump_into_node_function=AsciiDataTable_to_XmlDataTable, external_node_description="XMLDataTable")
Ancestors (in MRO)
- DataTableGraph
- Graph
- __builtin__.object
Instance variables
Methods
def __init__(
self, **options)
Initializes the graph. The first 2 nodes and two edges forming a bijection between them are required
def __init__(self,**options): defaults={"graph_name":"Data Table Graph", "node_names":['DataFrameDictionary','AsciiDataTable'], "node_descriptions":["Pandas Data Frame Dictionary","AsciiDataTable"], "current_node":'DataFrameDictionary', "state":[1,0], "data":AsciiDataTable_to_DataFrameDictionary(TwoPortRawModel(os.path.join(TESTS_DIRECTORY,'TestFileTwoPortRaw.txt'))), "edge_2_to_1":AsciiDataTable_to_DataFrameDictionary, "edge_1_to_2":DataFrameDictionary_to_AsciiDataTable } graph_options={} for key,value in defaults.items(): graph_options[key]=value for key,value in options.items(): graph_options[key]=value Graph.__init__(self, **graph_options) self.add_node("ExcelFile", "DataFrameDictionary", DataFrameDictionary_to_ExcelFile, "DataFrameDictionary", ExcelFile_to_DataFrameDictionary, node_description="Excel Workbook") self.add_node("HdfFile", "DataFrameDictionary", DataFrameDictionary_to_HdfFile, "DataFrameDictionary", HdfFile_to_DataFrameDictionary, node_description="HD5 File") self.add_node("CsvFile", "AsciiDataTable", AsciiDataTable_to_CsvFile, "AsciiDataTable", File_to_AsciiDataTable, node_description="CSV File") self.add_node("HpFile", "AsciiDataTable", AsciiDataTable_to_HpFile, "AsciiDataTable", File_to_AsciiDataTable, node_description="hp format File") self.add_external_node(external_node_name="XMLDataTable", jump_into_node_begin="AsciiDataTable", jump_into_node_function=AsciiDataTable_to_XmlDataTable, external_node_description="XMLDataTable")
def add_edge(
self, begin_node=None, end_node=None, edge_function=None)
Adds an edge mapping one node to another, required input is begin_node (it's name) end_node, and the edge function
def add_edge(self, begin_node=None, end_node=None, edge_function=None): """Adds an edge mapping one node to another, required input is begin_node (it's name) end_node, and the edge function""" # check to see if edge is defined if it is increment a number edge_match = re.compile("edge_{0}_{1}".format(begin_node, end_node)) keys = list(self.__dict__.keys()) # print keys iterator = 0 for key in keys: if re.match(edge_match, key): iterator += 1 edge_name = "edge_{0}_{1}_{2:0>3d}".format(begin_node, end_node, iterator) self.__dict__[edge_name] = edge_function self.edges.append(edge_name) edge_matrix = np.zeros((len(self.state), len(self.state))) begin_position = self.node_names.index(begin_node) end_position = self.node_names.index(end_node) edge_matrix[end_position][begin_position] = 1 edge_matrix = np.matrix(edge_matrix) self.edge_matrices.append(edge_matrix) self.display_graph.add_edge(begin_node, end_node) self.display_layout = networkx.spring_layout(self.display_graph)
def add_external_node(
self, external_node_name, jump_into_node_begin, jump_into_node_function, external_node_description=None)
Inheritance:
Graph.add_external_node
Adds an external node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new external node
def add_external_node(self, external_node_name, jump_into_node_begin, jump_into_node_function, external_node_description=None): """Adds an external node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new external node""" # first check if node into and out of node is good self.external_node_names.append(external_node_name) self.add_jump(begin_node=jump_into_node_begin, end_node=external_node_name, jump_function=jump_into_node_function) if external_node_description: self.external_node_descriptions.append(external_node_description) self.display_graph.add_node(external_node_name) self.display_graph.add_edge(jump_into_node_begin, external_node_name) self.display_layout = networkx.spring_layout(self.display_graph)
def add_jump(
self, begin_node=None, end_node=None, jump_function=None)
Adds a jump mapping one internal node to an external node, required input is begin_node (it's name) end_node, and the edge function
def add_jump(self, begin_node=None, end_node=None, jump_function=None): """Adds a jump mapping one internal node to an external node, required input is begin_node (it's name) end_node, and the edge function""" # check to see if edge is defined if it is increment a number jump_match = re.compile("jump_{0}_{1}".format(begin_node, end_node)) keys = list(self.__dict__.keys()) # print keys iterator = 0 for key in keys: if re.match(jump_match, key): iterator += 1 jump_name = "jump_{0}_{1}_{2:0>3d}".format(begin_node, end_node, iterator) self.__dict__[jump_name] = jump_function self.jumps.append(jump_name) self.display_graph.add_edge(begin_node, end_node) self.display_layout = networkx.spring_layout(self.display_graph)
def add_node(
self, node_name, edge_into_node_begin, edge_into_node_function, edge_out_node_end, edge_out_node_function, node_description=None)
Adds a node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new node, a reference to an exiting node and the function mapping the new node to the exiting node.
def add_node(self, node_name, edge_into_node_begin, edge_into_node_function, edge_out_node_end, edge_out_node_function, node_description=None): """Adds a node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new node, a reference to an exiting node and the function mapping the new node to the exiting node.""" # first check if node into and out of node is good self.node_names.append(node_name) self.state.append(0) self.state_matrix = np.matrix(self.state).T for index, matrix in enumerate(self.edge_matrices): pad_row = np.zeros((1, len(matrix))) new_matrix = np.concatenate((matrix, pad_row), axis=0) pad_column = np.zeros((1, len(self.node_names))) new_matrix = np.concatenate((new_matrix, pad_column.T), axis=1) # print("New matrix is :\n{0}".format(new_matrix)) self.edge_matrices[index] = new_matrix self.add_edge(begin_node=node_name, end_node=edge_out_node_end, edge_function=edge_out_node_function) self.add_edge(begin_node=edge_into_node_begin, end_node=node_name, edge_function=edge_into_node_function) if node_description: self.node_descriptions.append(node_description) self.display_graph.add_node(node_name) self.display_graph.add_edge(node_name, edge_out_node_end) self.display_graph.add_edge(edge_into_node_begin, node_name) self.display_layout = networkx.spring_layout(self.display_graph)
def check_closed_path(
self)
Inheritance:
Graph.check_closed_path
Checks that data is not changed for the first closed path found. Returns True if data==data after moving around the closed path, False otherwise. Starting point is current_node
def check_closed_path(self): """Checks that data is not changed for the first closed path found. Returns True if data==data after moving around the closed path, False otherwise. Starting point is current_node """ temp_data = self.data path = self.get_path(self.current_node, self.current_node) if self.is_path_valid(path): pass else: print("Path is not valid, graph definition is broken") raise out = temp_data == self.data out_list = [self.current_node, path, out] print(("The assertion that the data remains unchanged,\n" "for node {0} following path {1} is {2}".format(*out_list))) return out
def get_description_dictionary(
self)
Inheritance:
Graph.get_description_dictionary
returns a dictionary of the form {NodeName:Node Description for all of the current nodes
def get_description_dictionary(self): "returns a dictionary of the form {NodeName:Node Description for all of the current nodes" dictionary = {node_name: self.node_descriptions[index] for index, node_name in enumerate(self.node_names)} return dictionary
def get_entering_edges(
self, node)
Inheritance:
Graph.get_entering_edges
Returns all edges that enter the specificed node
def get_entering_edges(self, node): """Returns all edges that enter the specificed node""" enter_edge_pattern = re.compile('edge_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(node)) enter_edges = [] for index, edge in enumerate(self.edges): if re.match(enter_edge_pattern, edge): enter_edges.append(edge) return enter_edges
def get_entering_nodes(
self, node)
Inheritance:
Graph.get_entering_nodes
Returns all nodes that have an edge that enter the specificed node
def get_entering_nodes(self, node): """Returns all nodes that have an edge that enter the specificed node""" enter_edge_pattern = re.compile('edge_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(node)) enter_nodes = [] for index, edge in enumerate(self.edges): enter_match = re.match(enter_edge_pattern, edge) if enter_match: enter_node = enter_match.groupdict()['begin_node'] enter_nodes.append(enter_node) return enter_nodes
def get_exiting_edges(
self, node)
Inheritance:
Graph.get_exiting_edges
Returns all edges that exit the specificed node
def get_exiting_edges(self, node): """Returns all edges that exit the specificed node""" exit_edge_pattern = re.compile('edge_{0}_(?P<end_node>\w+)_(?P<iterator>\w+)'.format(node)) exit_edges = [] for index, edge in enumerate(self.edges): if re.match(exit_edge_pattern, edge): exit_edges.append(edge) return exit_edges
def get_exiting_nodes(
self, node)
Inheritance:
Graph.get_exiting_nodes
Returns all nodes that have an edge leaving the specificed node
def get_exiting_nodes(self, node): """Returns all nodes that have an edge leaving the specificed node""" exit_edge_pattern = re.compile('edge_{0}_(?P<end_node>\w+)_(?P<iterator>\w+)'.format(node)) exit_nodes = [] for index, edge in enumerate(self.edges): exit_match = re.match(exit_edge_pattern, edge) if exit_match: exit_node = exit_match.groupdict()['end_node'] exit_nodes.append(exit_node) return exit_nodes
def get_path(
self, first_node, last_node, **options)
Returns the first path found between first node and last node, uses a breadth first search algorithm
def get_path(self, first_node, last_node, **options): """Returns the first path found between first node and last node, uses a breadth first search algorithm""" defaults = {"debug": False, "method": "BreathFirst"} self.get_path_options = {} for key, value in defaults.items(): self.get_path_options[key] = value for key, value in options.items(): self.get_path_options[key] = value unvisited_nodes = self.node_names[:] unvisited_nodes.remove(first_node) visited_nodes = [first_node] node_history = [] edge_history = [] path_queue = [] possible_paths = [] queue = [] current_edge = [] queue.append(first_node) path = {first_node: []} while queue: # first remove the current_node = queue.pop(0) if path_queue != []: current_edge = path_queue.pop(0) edge_history.append(current_edge) node_history.append(current_node) if self.get_path_options["debug"]: print(("current_node is {0}".format(current_node))) print(("current_edge is {0}".format(current_edge))) # if this node is the destination exit returning the path if current_node == last_node: if self.get_path_options["debug"]: print(("Node path was found to be {0}".format(node_path))) print(("path was found to be {0}".format(edge_path))) print(("{0} is {1}".format("path", path))) return path[last_node][::-1] adjacent_nodes = self.get_exiting_nodes(current_node) adjacent_paths = self.get_exiting_edges(current_node) if self.get_path_options["debug"]: print(("{0} are {1}".format("adjacent_nodes", adjacent_nodes))) print(("{0} are {1}".format("adjacent_paths", adjacent_paths))) current_history = edge_history for node_index, node in enumerate(adjacent_nodes): if node not in visited_nodes: queue.append(node) path_queue.append(adjacent_paths[node_index]) visited_nodes.append(node) path[node] = [adjacent_paths[node_index]] + path[current_node] path[node] # possible_paths.append(current_path.append(node)) if self.get_path_options["debug"]: print(("{0} is {1}".format("path_queue", path_queue)))
def is_graph_isomorphic(
self)
Inheritance:
Graph.is_graph_isomorphic
Returns True if all nodes have closed paths that preserve the data, False otherwise
def is_graph_isomorphic(self): """Returns True if all nodes have closed paths that preserve the data, False otherwise""" out = True for node in self.node_names: self.move_to_node(node) if not self.check_closed_path: out = False return out
def is_path_valid(
self, path)
Inheritance:
Graph.is_path_valid
Returns True if the path is valid from the current node position or False otherwise
def is_path_valid(self, path): """Returns True if the path is valid from the current node position or False otherwise""" null_state = [0 for i in range(len(self.node_names))] null_state_matrix = np.matrix(null_state).T new_state = np.matrix(self.state).T for index, edge in enumerate(path): # print index # print edge edge_position = self.edges.index(edge) move_matrix = self.edge_matrices[edge_position] # print move_matrix new_state = move_matrix * new_state if new_state.any() == null_state_matrix.any(): # print new_state # print null_state_matrix return False return True
def jump_to_external_node(
self, external_node_name, **options)
Inheritance:
Graph.jump_to_external_node
Returns the result of the jump, the graph is left in the node that is the begining of the jump
def jump_to_external_node(self, external_node_name, **options): """Returns the result of the jump, the graph is left in the node that is the begining of the jump""" end_node = external_node_name jump_pattern = 'jump_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(end_node) for jump in self.jumps[:]: jump_match = re.match(jump_pattern, jump, re.IGNORECASE) if jump_match: jump_to_use = jump begin_node = jump_match.groupdict()["begin_node"] self.move_to_node(begin_node) return self.__dict__[jump_to_use](self.data, **options)
def move_to(
self, path, **options)
Changes the state of the graph by moving along the path specified
def move_to(self, path, **options): """Changes the state of the graph by moving along the path specified""" defaults = {"debug": False, "verbose": False} move_options = {} for key, value in defaults.items(): move_options[key] = value for key, value in options.items(): move_options[key] = value if move_options["debug"]: print(path) for index, edge in enumerate(path): # print edge edge_pattern = 'edge_(?P<begin_node>\w+)_(?P<end_node>\w+)_(?P<iterator>\w+)' match = re.match(edge_pattern, edge) begin_node = match.groupdict()['begin_node'] end_node = match.groupdict()['end_node'] if move_options["verbose"]: print(("moving {0} -> {1}".format(begin_node, end_node))) # print self.data self.data = self.__dict__[edge](self.data) # print self.data self.current_node = match.groupdict()['end_node'] self.state = [0 for i in range(len(self.node_names))] position = self.node_names.index(self.current_node) self.state[position] = 1 self.state_matrix = np.matrix(self.state).T
def move_to_node(
self, node)
Inheritance:
Graph.move_to_node
Moves from current_node to the specified node
def move_to_node(self, node): """Moves from current_node to the specified node""" path = self.get_path(self.current_node, node) self.move_to(path)
def path_length(
self, path, num_repeats=10)
Inheritance:
Graph.path_length
Determines the length of a given path, currently the metric is based on the time to move to.
def path_length(self, path, num_repeats=10): """Determines the length of a given path, currently the metric is based on the time to move to.""" begin_time = datetime.datetime.now() # num_repeats=100 for i in range(num_repeats): self.virtual_move_to(path) end_time = datetime.datetime.now() delta_t = end_time - begin_time path_length = delta_t.total_seconds() / float(num_repeats) if path_length == 0.0: print("Warning the path length is less than 1 microsecond," "make sure num_repeats is high enough to measure it.") return path_length
def set_state(
self, node_name, node_data)
Sets the graph state to be the state specified by node_name, and node_data
def set_state(self, node_name, node_data): """Sets the graph state to be the state specified by node_name, and node_data""" try: current_node_state_position = self.node_names.index(node_name) self.current_node = node_name self.data = node_data self.state = [0 for i in range(len(self.node_names))] self.state[current_node_state_position] = 1 self.state_matrix = np.matrix(self.state).T except: print(("Could not set the state of graph: {0}".format(self.graph_name))) raise
def show(
self, **options)
Shows the graph using matplotlib and networkx
def show(self, **options): """Shows the graph using matplotlib and networkx""" # Should be seperated to allow for fixed presentation? defaults = {"descriptions": False, "edge_descriptions": False, "save_plot": False, "path": None, "active_node": True, "directory": None, "specific_descriptor": self.graph_name.replace(" ", "_"), "general_descriptor": "plot", "file_name": None, "arrows": True, "node_size": 1000, "font_size": 10, "fix_layout": True} show_options = {} for key, value in defaults.items(): show_options[key] = value for key, value in options.items(): show_options[key] = value if show_options["directory"] is None: show_options["directory"] = os.getcwd() if show_options["active_node"]: node_colors = [] for node in self.display_graph.nodes(): if node == self.current_node: node_colors.append('b') else: if node in self.node_names: node_colors.append('r') elif node in self.external_node_names: node_colors.append('g') else: node_colors = ['r' for node in self.node_names] + ['g' for node in self.node_names] # print("{0} is {1}".format('node_colors',node_colors)) if show_options["descriptions"]: node_labels = {node: self.node_descriptions[index] for index, node in enumerate(self.node_names)} if self.external_node_names: for index, node in enumerate(self.external_node_names): node_labels[node] = self.external_node_descriptions[index] networkx.draw_networkx(self.display_graph, arrows=show_options["arrows"], labels=node_labels, node_color=node_colors, node_size=show_options["node_size"], font_size=show_options["font_size"], pos=self.display_layout) # print("{0} is {1}".format('node_labels',node_labels)) else: networkx.draw_networkx(self.display_graph, arrows=show_options["arrows"], node_color=node_colors, node_size=show_options["node_size"], font_size=show_options["font_size"], pos=self.display_layout) plt.axis('off') plt.suptitle(self.options["graph_name"]) if show_options["file_name"] is None: file_name = auto_name(specific_descriptor=show_options["specific_descriptor"], general_descriptor=show_options["general_descriptor"], directory=show_options["directory"], extension='png', padding=3) else: file_name = show_options["file_name"] if show_options["save_plot"]: # print file_name if show_options["path"]: plt.savefig(show_options["path"]) else: plt.savefig(os.path.join(show_options["directory"], file_name)) else: plt.show() fig = plt.gcf() return fig
def virtual_move_to(
self, path)
Inheritance:
Graph.virtual_move_to
virtual_move_to simulates moving but does not change the state of the graph
def virtual_move_to(self, path): """virtual_move_to simulates moving but does not change the state of the graph""" # print path temp_state = self.state temp_data = self.data temp_current_node = self.current_node temp_node_names = self.node_names for index, edge in enumerate(path): # print edge edge_pattern = 'edge_(?P<begin_node>\w+)_(?P<end_node>\w+)_(?P<iterator>\w+)' match = re.match(edge_pattern, edge) begin_node = match.groupdict()['begin_node'] end_node = match.groupdict()['end_node'] # print("moving {0} -> {1}".format(begin_node,end_node)) # print self.data temp_data = self.__dict__[edge](temp_data) # print self.data temp_current_node = match.groupdict()['end_node'] temp_state = [0 for i in range(len(temp_node_names))] position = temp_node_names.index(temp_current_node) temp_state[position] = 1
class Graph
The Graph class creates a content graph that has as nodes different formats. As a format is added via graph.add_node() by specifying a node name and a function from an existing node into the new one, and one exiting the node. Once a series of nodes exists to enter the graph at a node use graph.set_state() the current data representing the state is in the attribute graph.data. To move among the formats use graph.move_to_node('NodeName') need to recode the find_path method using a shortest path alogrithm like Dijkstra.
class Graph(object): """The Graph class creates a content graph that has as nodes different formats. As a format is added via graph.add_node() by specifying a node name and a function from an existing node into the new one, and one exiting the node. Once a series of nodes exists to enter the graph at a node use graph.set_state() the current data representing the state is in the attribute graph.data. To move among the formats use graph.move_to_node('NodeName') need to recode the find_path method using a shortest path alogrithm like [Dijkstra](https://en.wikipedia.org/wiki/Dijkstra%27s_algorithm). """ def __init__(self, **options): """Initializes the graph. The first 2 nodes and two edges forming a bijection between them are required""" defaults = {"graph_name": "Graph", "node_names": ['n1', 'n2'], "node_descriptions": ["A plain string", "A list of strings with no \\n, created with string.splitlines()"], "current_node": 'n1', "state": [1, 0], "data": "This is a test string\n it has to have multiple lines \n and many characters 34%6\n^", "edge_2_to_1": edge_2_to_1, "edge_1_to_2": edge_1_to_2 } self.options = {} for key, value in defaults.items(): self.options[key] = value for key, value in options.items(): self.options[key] = value self.elements = ['graph_name', 'node_names', 'node_descriptions', 'current_node', 'state', 'data'] for element in self.elements: self.__dict__[element] = self.options[element] self.edges = [] self.edge_matrices = [] self.state_matrix = np.matrix(self.state).T # Add the first 2 edges, required to intialize the graph properly self.display_graph = networkx.DiGraph() self.add_edge(self.node_names[0], self.node_names[1], self.options["edge_1_to_2"]) self.add_edge(self.node_names[1], self.node_names[0], self.options["edge_2_to_1"]) self.jumps = [] self.external_node_names = [] self.external_node_descriptions = [] self.display_layout = networkx.spring_layout(self.display_graph) def get_description_dictionary(self): "returns a dictionary of the form {NodeName:Node Description for all of the current nodes" dictionary = {node_name: self.node_descriptions[index] for index, node_name in enumerate(self.node_names)} return dictionary def set_state(self, node_name, node_data): """Sets the graph state to be the state specified by node_name, and node_data""" try: current_node_state_position = self.node_names.index(node_name) self.current_node = node_name self.data = node_data self.state = [0 for i in range(len(self.node_names))] self.state[current_node_state_position] = 1 self.state_matrix = np.matrix(self.state).T except: print(("Could not set the state of graph: {0}".format(self.graph_name))) raise def add_edge(self, begin_node=None, end_node=None, edge_function=None): """Adds an edge mapping one node to another, required input is begin_node (it's name) end_node, and the edge function""" # check to see if edge is defined if it is increment a number edge_match = re.compile("edge_{0}_{1}".format(begin_node, end_node)) keys = list(self.__dict__.keys()) # print keys iterator = 0 for key in keys: if re.match(edge_match, key): iterator += 1 edge_name = "edge_{0}_{1}_{2:0>3d}".format(begin_node, end_node, iterator) self.__dict__[edge_name] = edge_function self.edges.append(edge_name) edge_matrix = np.zeros((len(self.state), len(self.state))) begin_position = self.node_names.index(begin_node) end_position = self.node_names.index(end_node) edge_matrix[end_position][begin_position] = 1 edge_matrix = np.matrix(edge_matrix) self.edge_matrices.append(edge_matrix) self.display_graph.add_edge(begin_node, end_node) self.display_layout = networkx.spring_layout(self.display_graph) def add_jump(self, begin_node=None, end_node=None, jump_function=None): """Adds a jump mapping one internal node to an external node, required input is begin_node (it's name) end_node, and the edge function""" # check to see if edge is defined if it is increment a number jump_match = re.compile("jump_{0}_{1}".format(begin_node, end_node)) keys = list(self.__dict__.keys()) # print keys iterator = 0 for key in keys: if re.match(jump_match, key): iterator += 1 jump_name = "jump_{0}_{1}_{2:0>3d}".format(begin_node, end_node, iterator) self.__dict__[jump_name] = jump_function self.jumps.append(jump_name) self.display_graph.add_edge(begin_node, end_node) self.display_layout = networkx.spring_layout(self.display_graph) def move_to(self, path, **options): """Changes the state of the graph by moving along the path specified""" defaults = {"debug": False, "verbose": False} move_options = {} for key, value in defaults.items(): move_options[key] = value for key, value in options.items(): move_options[key] = value if move_options["debug"]: print(path) for index, edge in enumerate(path): # print edge edge_pattern = 'edge_(?P<begin_node>\w+)_(?P<end_node>\w+)_(?P<iterator>\w+)' match = re.match(edge_pattern, edge) begin_node = match.groupdict()['begin_node'] end_node = match.groupdict()['end_node'] if move_options["verbose"]: print(("moving {0} -> {1}".format(begin_node, end_node))) # print self.data self.data = self.__dict__[edge](self.data) # print self.data self.current_node = match.groupdict()['end_node'] self.state = [0 for i in range(len(self.node_names))] position = self.node_names.index(self.current_node) self.state[position] = 1 self.state_matrix = np.matrix(self.state).T # print self.state # print self.current_node def virtual_move_to(self, path): """virtual_move_to simulates moving but does not change the state of the graph""" # print path temp_state = self.state temp_data = self.data temp_current_node = self.current_node temp_node_names = self.node_names for index, edge in enumerate(path): # print edge edge_pattern = 'edge_(?P<begin_node>\w+)_(?P<end_node>\w+)_(?P<iterator>\w+)' match = re.match(edge_pattern, edge) begin_node = match.groupdict()['begin_node'] end_node = match.groupdict()['end_node'] # print("moving {0} -> {1}".format(begin_node,end_node)) # print self.data temp_data = self.__dict__[edge](temp_data) # print self.data temp_current_node = match.groupdict()['end_node'] temp_state = [0 for i in range(len(temp_node_names))] position = temp_node_names.index(temp_current_node) temp_state[position] = 1 # print temp_state # print self.state # print self.current_node def __str__(self): return str(self.data) def add_node(self, node_name, edge_into_node_begin, edge_into_node_function, edge_out_node_end, edge_out_node_function, node_description=None): """Adds a node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new node, a reference to an exiting node and the function mapping the new node to the exiting node.""" # first check if node into and out of node is good self.node_names.append(node_name) self.state.append(0) self.state_matrix = np.matrix(self.state).T for index, matrix in enumerate(self.edge_matrices): pad_row = np.zeros((1, len(matrix))) new_matrix = np.concatenate((matrix, pad_row), axis=0) pad_column = np.zeros((1, len(self.node_names))) new_matrix = np.concatenate((new_matrix, pad_column.T), axis=1) # print("New matrix is :\n{0}".format(new_matrix)) self.edge_matrices[index] = new_matrix self.add_edge(begin_node=node_name, end_node=edge_out_node_end, edge_function=edge_out_node_function) self.add_edge(begin_node=edge_into_node_begin, end_node=node_name, edge_function=edge_into_node_function) if node_description: self.node_descriptions.append(node_description) self.display_graph.add_node(node_name) self.display_graph.add_edge(node_name, edge_out_node_end) self.display_graph.add_edge(edge_into_node_begin, node_name) self.display_layout = networkx.spring_layout(self.display_graph) def add_external_node(self, external_node_name, jump_into_node_begin, jump_into_node_function, external_node_description=None): """Adds an external node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new external node""" # first check if node into and out of node is good self.external_node_names.append(external_node_name) self.add_jump(begin_node=jump_into_node_begin, end_node=external_node_name, jump_function=jump_into_node_function) if external_node_description: self.external_node_descriptions.append(external_node_description) self.display_graph.add_node(external_node_name) self.display_graph.add_edge(jump_into_node_begin, external_node_name) self.display_layout = networkx.spring_layout(self.display_graph) def jump_to_external_node(self, external_node_name, **options): """Returns the result of the jump, the graph is left in the node that is the begining of the jump""" end_node = external_node_name jump_pattern = 'jump_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(end_node) for jump in self.jumps[:]: jump_match = re.match(jump_pattern, jump, re.IGNORECASE) if jump_match: jump_to_use = jump begin_node = jump_match.groupdict()["begin_node"] self.move_to_node(begin_node) return self.__dict__[jump_to_use](self.data, **options) def path_length(self, path, num_repeats=10): """Determines the length of a given path, currently the metric is based on the time to move to.""" begin_time = datetime.datetime.now() # num_repeats=100 for i in range(num_repeats): self.virtual_move_to(path) end_time = datetime.datetime.now() delta_t = end_time - begin_time path_length = delta_t.total_seconds() / float(num_repeats) if path_length == 0.0: print("Warning the path length is less than 1 microsecond," "make sure num_repeats is high enough to measure it.") return path_length def is_path_valid(self, path): """Returns True if the path is valid from the current node position or False otherwise""" null_state = [0 for i in range(len(self.node_names))] null_state_matrix = np.matrix(null_state).T new_state = np.matrix(self.state).T for index, edge in enumerate(path): # print index # print edge edge_position = self.edges.index(edge) move_matrix = self.edge_matrices[edge_position] # print move_matrix new_state = move_matrix * new_state if new_state.any() == null_state_matrix.any(): # print new_state # print null_state_matrix return False return True def get_entering_nodes(self, node): """Returns all nodes that have an edge that enter the specificed node""" enter_edge_pattern = re.compile('edge_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(node)) enter_nodes = [] for index, edge in enumerate(self.edges): enter_match = re.match(enter_edge_pattern, edge) if enter_match: enter_node = enter_match.groupdict()['begin_node'] enter_nodes.append(enter_node) return enter_nodes def get_entering_edges(self, node): """Returns all edges that enter the specificed node""" enter_edge_pattern = re.compile('edge_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(node)) enter_edges = [] for index, edge in enumerate(self.edges): if re.match(enter_edge_pattern, edge): enter_edges.append(edge) return enter_edges def get_exiting_edges(self, node): """Returns all edges that exit the specificed node""" exit_edge_pattern = re.compile('edge_{0}_(?P<end_node>\w+)_(?P<iterator>\w+)'.format(node)) exit_edges = [] for index, edge in enumerate(self.edges): if re.match(exit_edge_pattern, edge): exit_edges.append(edge) return exit_edges def get_exiting_nodes(self, node): """Returns all nodes that have an edge leaving the specificed node""" exit_edge_pattern = re.compile('edge_{0}_(?P<end_node>\w+)_(?P<iterator>\w+)'.format(node)) exit_nodes = [] for index, edge in enumerate(self.edges): exit_match = re.match(exit_edge_pattern, edge) if exit_match: exit_node = exit_match.groupdict()['end_node'] exit_nodes.append(exit_node) return exit_nodes def get_path(self, first_node, last_node, **options): """Returns the first path found between first node and last node, uses a breadth first search algorithm""" defaults = {"debug": False, "method": "BreathFirst"} self.get_path_options = {} for key, value in defaults.items(): self.get_path_options[key] = value for key, value in options.items(): self.get_path_options[key] = value unvisited_nodes = self.node_names[:] unvisited_nodes.remove(first_node) visited_nodes = [first_node] node_history = [] edge_history = [] path_queue = [] possible_paths = [] queue = [] current_edge = [] queue.append(first_node) path = {first_node: []} while queue: # first remove the current_node = queue.pop(0) if path_queue != []: current_edge = path_queue.pop(0) edge_history.append(current_edge) node_history.append(current_node) if self.get_path_options["debug"]: print(("current_node is {0}".format(current_node))) print(("current_edge is {0}".format(current_edge))) # if this node is the destination exit returning the path if current_node == last_node: if self.get_path_options["debug"]: print(("Node path was found to be {0}".format(node_path))) print(("path was found to be {0}".format(edge_path))) print(("{0} is {1}".format("path", path))) return path[last_node][::-1] adjacent_nodes = self.get_exiting_nodes(current_node) adjacent_paths = self.get_exiting_edges(current_node) if self.get_path_options["debug"]: print(("{0} are {1}".format("adjacent_nodes", adjacent_nodes))) print(("{0} are {1}".format("adjacent_paths", adjacent_paths))) current_history = edge_history for node_index, node in enumerate(adjacent_nodes): if node not in visited_nodes: queue.append(node) path_queue.append(adjacent_paths[node_index]) visited_nodes.append(node) path[node] = [adjacent_paths[node_index]] + path[current_node] path[node] # possible_paths.append(current_path.append(node)) if self.get_path_options["debug"]: print(("{0} is {1}".format("path_queue", path_queue))) def move_to_node(self, node): """Moves from current_node to the specified node""" path = self.get_path(self.current_node, node) self.move_to(path) def check_closed_path(self): """Checks that data is not changed for the first closed path found. Returns True if data==data after moving around the closed path, False otherwise. Starting point is current_node """ temp_data = self.data path = self.get_path(self.current_node, self.current_node) if self.is_path_valid(path): pass else: print("Path is not valid, graph definition is broken") raise out = temp_data == self.data out_list = [self.current_node, path, out] print(("The assertion that the data remains unchanged,\n" "for node {0} following path {1} is {2}".format(*out_list))) return out def is_graph_isomorphic(self): """Returns True if all nodes have closed paths that preserve the data, False otherwise""" out = True for node in self.node_names: self.move_to_node(node) if not self.check_closed_path: out = False return out def show(self, **options): """Shows the graph using matplotlib and networkx""" # Should be seperated to allow for fixed presentation? defaults = {"descriptions": False, "edge_descriptions": False, "save_plot": False, "path": None, "active_node": True, "directory": None, "specific_descriptor": self.graph_name.replace(" ", "_"), "general_descriptor": "plot", "file_name": None, "arrows": True, "node_size": 1000, "font_size": 10, "fix_layout": True} show_options = {} for key, value in defaults.items(): show_options[key] = value for key, value in options.items(): show_options[key] = value if show_options["directory"] is None: show_options["directory"] = os.getcwd() if show_options["active_node"]: node_colors = [] for node in self.display_graph.nodes(): if node == self.current_node: node_colors.append('b') else: if node in self.node_names: node_colors.append('r') elif node in self.external_node_names: node_colors.append('g') else: node_colors = ['r' for node in self.node_names] + ['g' for node in self.node_names] # print("{0} is {1}".format('node_colors',node_colors)) if show_options["descriptions"]: node_labels = {node: self.node_descriptions[index] for index, node in enumerate(self.node_names)} if self.external_node_names: for index, node in enumerate(self.external_node_names): node_labels[node] = self.external_node_descriptions[index] networkx.draw_networkx(self.display_graph, arrows=show_options["arrows"], labels=node_labels, node_color=node_colors, node_size=show_options["node_size"], font_size=show_options["font_size"], pos=self.display_layout) # print("{0} is {1}".format('node_labels',node_labels)) else: networkx.draw_networkx(self.display_graph, arrows=show_options["arrows"], node_color=node_colors, node_size=show_options["node_size"], font_size=show_options["font_size"], pos=self.display_layout) plt.axis('off') plt.suptitle(self.options["graph_name"]) if show_options["file_name"] is None: file_name = auto_name(specific_descriptor=show_options["specific_descriptor"], general_descriptor=show_options["general_descriptor"], directory=show_options["directory"], extension='png', padding=3) else: file_name = show_options["file_name"] if show_options["save_plot"]: # print file_name if show_options["path"]: plt.savefig(show_options["path"]) else: plt.savefig(os.path.join(show_options["directory"], file_name)) else: plt.show() fig = plt.gcf() return fig
Ancestors (in MRO)
- Graph
- __builtin__.object
Instance variables
var display_graph
var display_layout
var edge_matrices
var edges
var elements
var external_node_descriptions
var external_node_names
var jumps
var options
var state_matrix
Methods
def __init__(
self, **options)
Initializes the graph. The first 2 nodes and two edges forming a bijection between them are required
def __init__(self, **options): """Initializes the graph. The first 2 nodes and two edges forming a bijection between them are required""" defaults = {"graph_name": "Graph", "node_names": ['n1', 'n2'], "node_descriptions": ["A plain string", "A list of strings with no \\n, created with string.splitlines()"], "current_node": 'n1', "state": [1, 0], "data": "This is a test string\n it has to have multiple lines \n and many characters 34%6\n^", "edge_2_to_1": edge_2_to_1, "edge_1_to_2": edge_1_to_2 } self.options = {} for key, value in defaults.items(): self.options[key] = value for key, value in options.items(): self.options[key] = value self.elements = ['graph_name', 'node_names', 'node_descriptions', 'current_node', 'state', 'data'] for element in self.elements: self.__dict__[element] = self.options[element] self.edges = [] self.edge_matrices = [] self.state_matrix = np.matrix(self.state).T # Add the first 2 edges, required to intialize the graph properly self.display_graph = networkx.DiGraph() self.add_edge(self.node_names[0], self.node_names[1], self.options["edge_1_to_2"]) self.add_edge(self.node_names[1], self.node_names[0], self.options["edge_2_to_1"]) self.jumps = [] self.external_node_names = [] self.external_node_descriptions = [] self.display_layout = networkx.spring_layout(self.display_graph)
def add_edge(
self, begin_node=None, end_node=None, edge_function=None)
Adds an edge mapping one node to another, required input is begin_node (it's name) end_node, and the edge function
def add_edge(self, begin_node=None, end_node=None, edge_function=None): """Adds an edge mapping one node to another, required input is begin_node (it's name) end_node, and the edge function""" # check to see if edge is defined if it is increment a number edge_match = re.compile("edge_{0}_{1}".format(begin_node, end_node)) keys = list(self.__dict__.keys()) # print keys iterator = 0 for key in keys: if re.match(edge_match, key): iterator += 1 edge_name = "edge_{0}_{1}_{2:0>3d}".format(begin_node, end_node, iterator) self.__dict__[edge_name] = edge_function self.edges.append(edge_name) edge_matrix = np.zeros((len(self.state), len(self.state))) begin_position = self.node_names.index(begin_node) end_position = self.node_names.index(end_node) edge_matrix[end_position][begin_position] = 1 edge_matrix = np.matrix(edge_matrix) self.edge_matrices.append(edge_matrix) self.display_graph.add_edge(begin_node, end_node) self.display_layout = networkx.spring_layout(self.display_graph)
def add_external_node(
self, external_node_name, jump_into_node_begin, jump_into_node_function, external_node_description=None)
Adds an external node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new external node
def add_external_node(self, external_node_name, jump_into_node_begin, jump_into_node_function, external_node_description=None): """Adds an external node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new external node""" # first check if node into and out of node is good self.external_node_names.append(external_node_name) self.add_jump(begin_node=jump_into_node_begin, end_node=external_node_name, jump_function=jump_into_node_function) if external_node_description: self.external_node_descriptions.append(external_node_description) self.display_graph.add_node(external_node_name) self.display_graph.add_edge(jump_into_node_begin, external_node_name) self.display_layout = networkx.spring_layout(self.display_graph)
def add_jump(
self, begin_node=None, end_node=None, jump_function=None)
Adds a jump mapping one internal node to an external node, required input is begin_node (it's name) end_node, and the edge function
def add_jump(self, begin_node=None, end_node=None, jump_function=None): """Adds a jump mapping one internal node to an external node, required input is begin_node (it's name) end_node, and the edge function""" # check to see if edge is defined if it is increment a number jump_match = re.compile("jump_{0}_{1}".format(begin_node, end_node)) keys = list(self.__dict__.keys()) # print keys iterator = 0 for key in keys: if re.match(jump_match, key): iterator += 1 jump_name = "jump_{0}_{1}_{2:0>3d}".format(begin_node, end_node, iterator) self.__dict__[jump_name] = jump_function self.jumps.append(jump_name) self.display_graph.add_edge(begin_node, end_node) self.display_layout = networkx.spring_layout(self.display_graph)
def add_node(
self, node_name, edge_into_node_begin, edge_into_node_function, edge_out_node_end, edge_out_node_function, node_description=None)
Adds a node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new node, a reference to an exiting node and the function mapping the new node to the exiting node.
def add_node(self, node_name, edge_into_node_begin, edge_into_node_function, edge_out_node_end, edge_out_node_function, node_description=None): """Adds a node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new node, a reference to an exiting node and the function mapping the new node to the exiting node.""" # first check if node into and out of node is good self.node_names.append(node_name) self.state.append(0) self.state_matrix = np.matrix(self.state).T for index, matrix in enumerate(self.edge_matrices): pad_row = np.zeros((1, len(matrix))) new_matrix = np.concatenate((matrix, pad_row), axis=0) pad_column = np.zeros((1, len(self.node_names))) new_matrix = np.concatenate((new_matrix, pad_column.T), axis=1) # print("New matrix is :\n{0}".format(new_matrix)) self.edge_matrices[index] = new_matrix self.add_edge(begin_node=node_name, end_node=edge_out_node_end, edge_function=edge_out_node_function) self.add_edge(begin_node=edge_into_node_begin, end_node=node_name, edge_function=edge_into_node_function) if node_description: self.node_descriptions.append(node_description) self.display_graph.add_node(node_name) self.display_graph.add_edge(node_name, edge_out_node_end) self.display_graph.add_edge(edge_into_node_begin, node_name) self.display_layout = networkx.spring_layout(self.display_graph)
def check_closed_path(
self)
Checks that data is not changed for the first closed path found. Returns True if data==data after moving around the closed path, False otherwise. Starting point is current_node
def check_closed_path(self): """Checks that data is not changed for the first closed path found. Returns True if data==data after moving around the closed path, False otherwise. Starting point is current_node """ temp_data = self.data path = self.get_path(self.current_node, self.current_node) if self.is_path_valid(path): pass else: print("Path is not valid, graph definition is broken") raise out = temp_data == self.data out_list = [self.current_node, path, out] print(("The assertion that the data remains unchanged,\n" "for node {0} following path {1} is {2}".format(*out_list))) return out
def get_description_dictionary(
self)
returns a dictionary of the form {NodeName:Node Description for all of the current nodes
def get_description_dictionary(self): "returns a dictionary of the form {NodeName:Node Description for all of the current nodes" dictionary = {node_name: self.node_descriptions[index] for index, node_name in enumerate(self.node_names)} return dictionary
def get_entering_edges(
self, node)
Returns all edges that enter the specificed node
def get_entering_edges(self, node): """Returns all edges that enter the specificed node""" enter_edge_pattern = re.compile('edge_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(node)) enter_edges = [] for index, edge in enumerate(self.edges): if re.match(enter_edge_pattern, edge): enter_edges.append(edge) return enter_edges
def get_entering_nodes(
self, node)
Returns all nodes that have an edge that enter the specificed node
def get_entering_nodes(self, node): """Returns all nodes that have an edge that enter the specificed node""" enter_edge_pattern = re.compile('edge_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(node)) enter_nodes = [] for index, edge in enumerate(self.edges): enter_match = re.match(enter_edge_pattern, edge) if enter_match: enter_node = enter_match.groupdict()['begin_node'] enter_nodes.append(enter_node) return enter_nodes
def get_exiting_edges(
self, node)
Returns all edges that exit the specificed node
def get_exiting_edges(self, node): """Returns all edges that exit the specificed node""" exit_edge_pattern = re.compile('edge_{0}_(?P<end_node>\w+)_(?P<iterator>\w+)'.format(node)) exit_edges = [] for index, edge in enumerate(self.edges): if re.match(exit_edge_pattern, edge): exit_edges.append(edge) return exit_edges
def get_exiting_nodes(
self, node)
Returns all nodes that have an edge leaving the specificed node
def get_exiting_nodes(self, node): """Returns all nodes that have an edge leaving the specificed node""" exit_edge_pattern = re.compile('edge_{0}_(?P<end_node>\w+)_(?P<iterator>\w+)'.format(node)) exit_nodes = [] for index, edge in enumerate(self.edges): exit_match = re.match(exit_edge_pattern, edge) if exit_match: exit_node = exit_match.groupdict()['end_node'] exit_nodes.append(exit_node) return exit_nodes
def get_path(
self, first_node, last_node, **options)
Returns the first path found between first node and last node, uses a breadth first search algorithm
def get_path(self, first_node, last_node, **options): """Returns the first path found between first node and last node, uses a breadth first search algorithm""" defaults = {"debug": False, "method": "BreathFirst"} self.get_path_options = {} for key, value in defaults.items(): self.get_path_options[key] = value for key, value in options.items(): self.get_path_options[key] = value unvisited_nodes = self.node_names[:] unvisited_nodes.remove(first_node) visited_nodes = [first_node] node_history = [] edge_history = [] path_queue = [] possible_paths = [] queue = [] current_edge = [] queue.append(first_node) path = {first_node: []} while queue: # first remove the current_node = queue.pop(0) if path_queue != []: current_edge = path_queue.pop(0) edge_history.append(current_edge) node_history.append(current_node) if self.get_path_options["debug"]: print(("current_node is {0}".format(current_node))) print(("current_edge is {0}".format(current_edge))) # if this node is the destination exit returning the path if current_node == last_node: if self.get_path_options["debug"]: print(("Node path was found to be {0}".format(node_path))) print(("path was found to be {0}".format(edge_path))) print(("{0} is {1}".format("path", path))) return path[last_node][::-1] adjacent_nodes = self.get_exiting_nodes(current_node) adjacent_paths = self.get_exiting_edges(current_node) if self.get_path_options["debug"]: print(("{0} are {1}".format("adjacent_nodes", adjacent_nodes))) print(("{0} are {1}".format("adjacent_paths", adjacent_paths))) current_history = edge_history for node_index, node in enumerate(adjacent_nodes): if node not in visited_nodes: queue.append(node) path_queue.append(adjacent_paths[node_index]) visited_nodes.append(node) path[node] = [adjacent_paths[node_index]] + path[current_node] path[node] # possible_paths.append(current_path.append(node)) if self.get_path_options["debug"]: print(("{0} is {1}".format("path_queue", path_queue)))
def is_graph_isomorphic(
self)
Returns True if all nodes have closed paths that preserve the data, False otherwise
def is_graph_isomorphic(self): """Returns True if all nodes have closed paths that preserve the data, False otherwise""" out = True for node in self.node_names: self.move_to_node(node) if not self.check_closed_path: out = False return out
def is_path_valid(
self, path)
Returns True if the path is valid from the current node position or False otherwise
def is_path_valid(self, path): """Returns True if the path is valid from the current node position or False otherwise""" null_state = [0 for i in range(len(self.node_names))] null_state_matrix = np.matrix(null_state).T new_state = np.matrix(self.state).T for index, edge in enumerate(path): # print index # print edge edge_position = self.edges.index(edge) move_matrix = self.edge_matrices[edge_position] # print move_matrix new_state = move_matrix * new_state if new_state.any() == null_state_matrix.any(): # print new_state # print null_state_matrix return False return True
def jump_to_external_node(
self, external_node_name, **options)
Returns the result of the jump, the graph is left in the node that is the begining of the jump
def jump_to_external_node(self, external_node_name, **options): """Returns the result of the jump, the graph is left in the node that is the begining of the jump""" end_node = external_node_name jump_pattern = 'jump_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(end_node) for jump in self.jumps[:]: jump_match = re.match(jump_pattern, jump, re.IGNORECASE) if jump_match: jump_to_use = jump begin_node = jump_match.groupdict()["begin_node"] self.move_to_node(begin_node) return self.__dict__[jump_to_use](self.data, **options)
def move_to(
self, path, **options)
Changes the state of the graph by moving along the path specified
def move_to(self, path, **options): """Changes the state of the graph by moving along the path specified""" defaults = {"debug": False, "verbose": False} move_options = {} for key, value in defaults.items(): move_options[key] = value for key, value in options.items(): move_options[key] = value if move_options["debug"]: print(path) for index, edge in enumerate(path): # print edge edge_pattern = 'edge_(?P<begin_node>\w+)_(?P<end_node>\w+)_(?P<iterator>\w+)' match = re.match(edge_pattern, edge) begin_node = match.groupdict()['begin_node'] end_node = match.groupdict()['end_node'] if move_options["verbose"]: print(("moving {0} -> {1}".format(begin_node, end_node))) # print self.data self.data = self.__dict__[edge](self.data) # print self.data self.current_node = match.groupdict()['end_node'] self.state = [0 for i in range(len(self.node_names))] position = self.node_names.index(self.current_node) self.state[position] = 1 self.state_matrix = np.matrix(self.state).T
def move_to_node(
self, node)
Moves from current_node to the specified node
def move_to_node(self, node): """Moves from current_node to the specified node""" path = self.get_path(self.current_node, node) self.move_to(path)
def path_length(
self, path, num_repeats=10)
Determines the length of a given path, currently the metric is based on the time to move to.
def path_length(self, path, num_repeats=10): """Determines the length of a given path, currently the metric is based on the time to move to.""" begin_time = datetime.datetime.now() # num_repeats=100 for i in range(num_repeats): self.virtual_move_to(path) end_time = datetime.datetime.now() delta_t = end_time - begin_time path_length = delta_t.total_seconds() / float(num_repeats) if path_length == 0.0: print("Warning the path length is less than 1 microsecond," "make sure num_repeats is high enough to measure it.") return path_length
def set_state(
self, node_name, node_data)
Sets the graph state to be the state specified by node_name, and node_data
def set_state(self, node_name, node_data): """Sets the graph state to be the state specified by node_name, and node_data""" try: current_node_state_position = self.node_names.index(node_name) self.current_node = node_name self.data = node_data self.state = [0 for i in range(len(self.node_names))] self.state[current_node_state_position] = 1 self.state_matrix = np.matrix(self.state).T except: print(("Could not set the state of graph: {0}".format(self.graph_name))) raise
def show(
self, **options)
Shows the graph using matplotlib and networkx
def show(self, **options): """Shows the graph using matplotlib and networkx""" # Should be seperated to allow for fixed presentation? defaults = {"descriptions": False, "edge_descriptions": False, "save_plot": False, "path": None, "active_node": True, "directory": None, "specific_descriptor": self.graph_name.replace(" ", "_"), "general_descriptor": "plot", "file_name": None, "arrows": True, "node_size": 1000, "font_size": 10, "fix_layout": True} show_options = {} for key, value in defaults.items(): show_options[key] = value for key, value in options.items(): show_options[key] = value if show_options["directory"] is None: show_options["directory"] = os.getcwd() if show_options["active_node"]: node_colors = [] for node in self.display_graph.nodes(): if node == self.current_node: node_colors.append('b') else: if node in self.node_names: node_colors.append('r') elif node in self.external_node_names: node_colors.append('g') else: node_colors = ['r' for node in self.node_names] + ['g' for node in self.node_names] # print("{0} is {1}".format('node_colors',node_colors)) if show_options["descriptions"]: node_labels = {node: self.node_descriptions[index] for index, node in enumerate(self.node_names)} if self.external_node_names: for index, node in enumerate(self.external_node_names): node_labels[node] = self.external_node_descriptions[index] networkx.draw_networkx(self.display_graph, arrows=show_options["arrows"], labels=node_labels, node_color=node_colors, node_size=show_options["node_size"], font_size=show_options["font_size"], pos=self.display_layout) # print("{0} is {1}".format('node_labels',node_labels)) else: networkx.draw_networkx(self.display_graph, arrows=show_options["arrows"], node_color=node_colors, node_size=show_options["node_size"], font_size=show_options["font_size"], pos=self.display_layout) plt.axis('off') plt.suptitle(self.options["graph_name"]) if show_options["file_name"] is None: file_name = auto_name(specific_descriptor=show_options["specific_descriptor"], general_descriptor=show_options["general_descriptor"], directory=show_options["directory"], extension='png', padding=3) else: file_name = show_options["file_name"] if show_options["save_plot"]: # print file_name if show_options["path"]: plt.savefig(show_options["path"]) else: plt.savefig(os.path.join(show_options["directory"], file_name)) else: plt.show() fig = plt.gcf() return fig
def virtual_move_to(
self, path)
virtual_move_to simulates moving but does not change the state of the graph
def virtual_move_to(self, path): """virtual_move_to simulates moving but does not change the state of the graph""" # print path temp_state = self.state temp_data = self.data temp_current_node = self.current_node temp_node_names = self.node_names for index, edge in enumerate(path): # print edge edge_pattern = 'edge_(?P<begin_node>\w+)_(?P<end_node>\w+)_(?P<iterator>\w+)' match = re.match(edge_pattern, edge) begin_node = match.groupdict()['begin_node'] end_node = match.groupdict()['end_node'] # print("moving {0} -> {1}".format(begin_node,end_node)) # print self.data temp_data = self.__dict__[edge](temp_data) # print self.data temp_current_node = match.groupdict()['end_node'] temp_state = [0 for i in range(len(temp_node_names))] position = temp_node_names.index(temp_current_node) temp_state[position] = 1
class ImageGraph
A transformation graph for images node types are image formats and external nodes are common image processing functions #!python defaults={"graph_name":"Image Graph", "node_names":['Image','png'], "node_descriptions":["PIL Image","png"], "current_node":'Image', "state":[1,0], "data":PIL.Image.open(os.path.join(TESTS_DIRECTORY,'test.png')), "edge_2_to_1":File_to_Image, "edge_1_to_2":lambda x: Image_to_FileType(x,file_path="test",extension="png")}
class ImageGraph(Graph): """A transformation graph for images node types are image formats and external nodes are common image processing functions #!python defaults={"graph_name":"Image Graph", "node_names":['Image','png'], "node_descriptions":["PIL Image","png"], "current_node":'Image', "state":[1,0], "data":PIL.Image.open(os.path.join(TESTS_DIRECTORY,'test.png')), "edge_2_to_1":File_to_Image, "edge_1_to_2":lambda x: Image_to_FileType(x,file_path="test",extension="png")} """ def __init__(self,**options): defaults={"graph_name":"Image Graph", "node_names":['Image','Png'], "node_descriptions":["PIL Image","Png"], "current_node":'Image', "state":[1,0], "data":PIL.Image.open(os.path.join(TESTS_DIRECTORY,'test.png')), "edge_2_to_1":File_to_Image, "edge_1_to_2":lambda x: Image_to_FileType(x,file_path="test",extension="png")} self.options={} for key,value in defaults.items(): self.options[key]=value for key,value in options.items(): self.options[key]=value Graph.__init__(self,**self.options) self.add_node("Jpg","Image",lambda x: Image_to_FileType(x,file_path="test",extension="jpg"), "Image",File_to_Image,node_description="Jpg File") self.add_node("Tiff","Image",lambda x: Image_to_FileType(x,file_path="test",extension="tiff"), "Image",File_to_Image,node_description="Tif File") self.add_node("Gif","Image",lambda x: Image_to_FileType(x,file_path="test",extension="gif"), "Image",File_to_Image,node_description="Gif File") self.add_node("Bmp","Image",lambda x: Image_to_FileType(x,file_path="test",extension="bmp"), "Image",File_to_Image,node_description="BMP File") self.add_node("Base64","Png",PngFile_to_Base64, "Png",Base64_to_PngFile,node_description="Base 64 PNG") self.add_node("EmbeddedHtml","Base64",Base64Png_to_EmbeddedHtmlString, "Base64",EmbeddedHtmlString_to_Base64Png,node_description="Embedded HTML of PNG") self.add_node("Ndarray","Png",PngFile_to_Ndarray, "Png",Ndarray_to_PngFile,node_description="Numpy Array") self.add_node("MatplotlibFigure","Ndarray",Ndarray_to_MatplotlibFigure, "Png",MatplotlibFigure_to_PngFile,node_description="MatplotlibFigure") self.add_external_node("Thumbnail","Image",Image_to_ThumbnailFile,external_node_description="JPEG Thumbnail") self.add_external_node("Matplotlib","Ndarray",Ndarray_to_Matplotlib, external_node_description="Matplotlib Plot")
Ancestors (in MRO)
- ImageGraph
- Graph
- __builtin__.object
Instance variables
Methods
def __init__(
self, **options)
Initializes the graph. The first 2 nodes and two edges forming a bijection between them are required
def __init__(self,**options): defaults={"graph_name":"Image Graph", "node_names":['Image','Png'], "node_descriptions":["PIL Image","Png"], "current_node":'Image', "state":[1,0], "data":PIL.Image.open(os.path.join(TESTS_DIRECTORY,'test.png')), "edge_2_to_1":File_to_Image, "edge_1_to_2":lambda x: Image_to_FileType(x,file_path="test",extension="png")} self.options={} for key,value in defaults.items(): self.options[key]=value for key,value in options.items(): self.options[key]=value Graph.__init__(self,**self.options) self.add_node("Jpg","Image",lambda x: Image_to_FileType(x,file_path="test",extension="jpg"), "Image",File_to_Image,node_description="Jpg File") self.add_node("Tiff","Image",lambda x: Image_to_FileType(x,file_path="test",extension="tiff"), "Image",File_to_Image,node_description="Tif File") self.add_node("Gif","Image",lambda x: Image_to_FileType(x,file_path="test",extension="gif"), "Image",File_to_Image,node_description="Gif File") self.add_node("Bmp","Image",lambda x: Image_to_FileType(x,file_path="test",extension="bmp"), "Image",File_to_Image,node_description="BMP File") self.add_node("Base64","Png",PngFile_to_Base64, "Png",Base64_to_PngFile,node_description="Base 64 PNG") self.add_node("EmbeddedHtml","Base64",Base64Png_to_EmbeddedHtmlString, "Base64",EmbeddedHtmlString_to_Base64Png,node_description="Embedded HTML of PNG") self.add_node("Ndarray","Png",PngFile_to_Ndarray, "Png",Ndarray_to_PngFile,node_description="Numpy Array") self.add_node("MatplotlibFigure","Ndarray",Ndarray_to_MatplotlibFigure, "Png",MatplotlibFigure_to_PngFile,node_description="MatplotlibFigure") self.add_external_node("Thumbnail","Image",Image_to_ThumbnailFile,external_node_description="JPEG Thumbnail") self.add_external_node("Matplotlib","Ndarray",Ndarray_to_Matplotlib, external_node_description="Matplotlib Plot")
def add_edge(
self, begin_node=None, end_node=None, edge_function=None)
Adds an edge mapping one node to another, required input is begin_node (it's name) end_node, and the edge function
def add_edge(self, begin_node=None, end_node=None, edge_function=None): """Adds an edge mapping one node to another, required input is begin_node (it's name) end_node, and the edge function""" # check to see if edge is defined if it is increment a number edge_match = re.compile("edge_{0}_{1}".format(begin_node, end_node)) keys = list(self.__dict__.keys()) # print keys iterator = 0 for key in keys: if re.match(edge_match, key): iterator += 1 edge_name = "edge_{0}_{1}_{2:0>3d}".format(begin_node, end_node, iterator) self.__dict__[edge_name] = edge_function self.edges.append(edge_name) edge_matrix = np.zeros((len(self.state), len(self.state))) begin_position = self.node_names.index(begin_node) end_position = self.node_names.index(end_node) edge_matrix[end_position][begin_position] = 1 edge_matrix = np.matrix(edge_matrix) self.edge_matrices.append(edge_matrix) self.display_graph.add_edge(begin_node, end_node) self.display_layout = networkx.spring_layout(self.display_graph)
def add_external_node(
self, external_node_name, jump_into_node_begin, jump_into_node_function, external_node_description=None)
Inheritance:
Graph.add_external_node
Adds an external node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new external node
def add_external_node(self, external_node_name, jump_into_node_begin, jump_into_node_function, external_node_description=None): """Adds an external node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new external node""" # first check if node into and out of node is good self.external_node_names.append(external_node_name) self.add_jump(begin_node=jump_into_node_begin, end_node=external_node_name, jump_function=jump_into_node_function) if external_node_description: self.external_node_descriptions.append(external_node_description) self.display_graph.add_node(external_node_name) self.display_graph.add_edge(jump_into_node_begin, external_node_name) self.display_layout = networkx.spring_layout(self.display_graph)
def add_jump(
self, begin_node=None, end_node=None, jump_function=None)
Adds a jump mapping one internal node to an external node, required input is begin_node (it's name) end_node, and the edge function
def add_jump(self, begin_node=None, end_node=None, jump_function=None): """Adds a jump mapping one internal node to an external node, required input is begin_node (it's name) end_node, and the edge function""" # check to see if edge is defined if it is increment a number jump_match = re.compile("jump_{0}_{1}".format(begin_node, end_node)) keys = list(self.__dict__.keys()) # print keys iterator = 0 for key in keys: if re.match(jump_match, key): iterator += 1 jump_name = "jump_{0}_{1}_{2:0>3d}".format(begin_node, end_node, iterator) self.__dict__[jump_name] = jump_function self.jumps.append(jump_name) self.display_graph.add_edge(begin_node, end_node) self.display_layout = networkx.spring_layout(self.display_graph)
def add_node(
self, node_name, edge_into_node_begin, edge_into_node_function, edge_out_node_end, edge_out_node_function, node_description=None)
Adds a node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new node, a reference to an exiting node and the function mapping the new node to the exiting node.
def add_node(self, node_name, edge_into_node_begin, edge_into_node_function, edge_out_node_end, edge_out_node_function, node_description=None): """Adds a node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new node, a reference to an exiting node and the function mapping the new node to the exiting node.""" # first check if node into and out of node is good self.node_names.append(node_name) self.state.append(0) self.state_matrix = np.matrix(self.state).T for index, matrix in enumerate(self.edge_matrices): pad_row = np.zeros((1, len(matrix))) new_matrix = np.concatenate((matrix, pad_row), axis=0) pad_column = np.zeros((1, len(self.node_names))) new_matrix = np.concatenate((new_matrix, pad_column.T), axis=1) # print("New matrix is :\n{0}".format(new_matrix)) self.edge_matrices[index] = new_matrix self.add_edge(begin_node=node_name, end_node=edge_out_node_end, edge_function=edge_out_node_function) self.add_edge(begin_node=edge_into_node_begin, end_node=node_name, edge_function=edge_into_node_function) if node_description: self.node_descriptions.append(node_description) self.display_graph.add_node(node_name) self.display_graph.add_edge(node_name, edge_out_node_end) self.display_graph.add_edge(edge_into_node_begin, node_name) self.display_layout = networkx.spring_layout(self.display_graph)
def check_closed_path(
self)
Inheritance:
Graph.check_closed_path
Checks that data is not changed for the first closed path found. Returns True if data==data after moving around the closed path, False otherwise. Starting point is current_node
def check_closed_path(self): """Checks that data is not changed for the first closed path found. Returns True if data==data after moving around the closed path, False otherwise. Starting point is current_node """ temp_data = self.data path = self.get_path(self.current_node, self.current_node) if self.is_path_valid(path): pass else: print("Path is not valid, graph definition is broken") raise out = temp_data == self.data out_list = [self.current_node, path, out] print(("The assertion that the data remains unchanged,\n" "for node {0} following path {1} is {2}".format(*out_list))) return out
def get_description_dictionary(
self)
Inheritance:
Graph.get_description_dictionary
returns a dictionary of the form {NodeName:Node Description for all of the current nodes
def get_description_dictionary(self): "returns a dictionary of the form {NodeName:Node Description for all of the current nodes" dictionary = {node_name: self.node_descriptions[index] for index, node_name in enumerate(self.node_names)} return dictionary
def get_entering_edges(
self, node)
Inheritance:
Graph.get_entering_edges
Returns all edges that enter the specificed node
def get_entering_edges(self, node): """Returns all edges that enter the specificed node""" enter_edge_pattern = re.compile('edge_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(node)) enter_edges = [] for index, edge in enumerate(self.edges): if re.match(enter_edge_pattern, edge): enter_edges.append(edge) return enter_edges
def get_entering_nodes(
self, node)
Inheritance:
Graph.get_entering_nodes
Returns all nodes that have an edge that enter the specificed node
def get_entering_nodes(self, node): """Returns all nodes that have an edge that enter the specificed node""" enter_edge_pattern = re.compile('edge_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(node)) enter_nodes = [] for index, edge in enumerate(self.edges): enter_match = re.match(enter_edge_pattern, edge) if enter_match: enter_node = enter_match.groupdict()['begin_node'] enter_nodes.append(enter_node) return enter_nodes
def get_exiting_edges(
self, node)
Inheritance:
Graph.get_exiting_edges
Returns all edges that exit the specificed node
def get_exiting_edges(self, node): """Returns all edges that exit the specificed node""" exit_edge_pattern = re.compile('edge_{0}_(?P<end_node>\w+)_(?P<iterator>\w+)'.format(node)) exit_edges = [] for index, edge in enumerate(self.edges): if re.match(exit_edge_pattern, edge): exit_edges.append(edge) return exit_edges
def get_exiting_nodes(
self, node)
Inheritance:
Graph.get_exiting_nodes
Returns all nodes that have an edge leaving the specificed node
def get_exiting_nodes(self, node): """Returns all nodes that have an edge leaving the specificed node""" exit_edge_pattern = re.compile('edge_{0}_(?P<end_node>\w+)_(?P<iterator>\w+)'.format(node)) exit_nodes = [] for index, edge in enumerate(self.edges): exit_match = re.match(exit_edge_pattern, edge) if exit_match: exit_node = exit_match.groupdict()['end_node'] exit_nodes.append(exit_node) return exit_nodes
def get_path(
self, first_node, last_node, **options)
Returns the first path found between first node and last node, uses a breadth first search algorithm
def get_path(self, first_node, last_node, **options): """Returns the first path found between first node and last node, uses a breadth first search algorithm""" defaults = {"debug": False, "method": "BreathFirst"} self.get_path_options = {} for key, value in defaults.items(): self.get_path_options[key] = value for key, value in options.items(): self.get_path_options[key] = value unvisited_nodes = self.node_names[:] unvisited_nodes.remove(first_node) visited_nodes = [first_node] node_history = [] edge_history = [] path_queue = [] possible_paths = [] queue = [] current_edge = [] queue.append(first_node) path = {first_node: []} while queue: # first remove the current_node = queue.pop(0) if path_queue != []: current_edge = path_queue.pop(0) edge_history.append(current_edge) node_history.append(current_node) if self.get_path_options["debug"]: print(("current_node is {0}".format(current_node))) print(("current_edge is {0}".format(current_edge))) # if this node is the destination exit returning the path if current_node == last_node: if self.get_path_options["debug"]: print(("Node path was found to be {0}".format(node_path))) print(("path was found to be {0}".format(edge_path))) print(("{0} is {1}".format("path", path))) return path[last_node][::-1] adjacent_nodes = self.get_exiting_nodes(current_node) adjacent_paths = self.get_exiting_edges(current_node) if self.get_path_options["debug"]: print(("{0} are {1}".format("adjacent_nodes", adjacent_nodes))) print(("{0} are {1}".format("adjacent_paths", adjacent_paths))) current_history = edge_history for node_index, node in enumerate(adjacent_nodes): if node not in visited_nodes: queue.append(node) path_queue.append(adjacent_paths[node_index]) visited_nodes.append(node) path[node] = [adjacent_paths[node_index]] + path[current_node] path[node] # possible_paths.append(current_path.append(node)) if self.get_path_options["debug"]: print(("{0} is {1}".format("path_queue", path_queue)))
def is_graph_isomorphic(
self)
Inheritance:
Graph.is_graph_isomorphic
Returns True if all nodes have closed paths that preserve the data, False otherwise
def is_graph_isomorphic(self): """Returns True if all nodes have closed paths that preserve the data, False otherwise""" out = True for node in self.node_names: self.move_to_node(node) if not self.check_closed_path: out = False return out
def is_path_valid(
self, path)
Inheritance:
Graph.is_path_valid
Returns True if the path is valid from the current node position or False otherwise
def is_path_valid(self, path): """Returns True if the path is valid from the current node position or False otherwise""" null_state = [0 for i in range(len(self.node_names))] null_state_matrix = np.matrix(null_state).T new_state = np.matrix(self.state).T for index, edge in enumerate(path): # print index # print edge edge_position = self.edges.index(edge) move_matrix = self.edge_matrices[edge_position] # print move_matrix new_state = move_matrix * new_state if new_state.any() == null_state_matrix.any(): # print new_state # print null_state_matrix return False return True
def jump_to_external_node(
self, external_node_name, **options)
Inheritance:
Graph.jump_to_external_node
Returns the result of the jump, the graph is left in the node that is the begining of the jump
def jump_to_external_node(self, external_node_name, **options): """Returns the result of the jump, the graph is left in the node that is the begining of the jump""" end_node = external_node_name jump_pattern = 'jump_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(end_node) for jump in self.jumps[:]: jump_match = re.match(jump_pattern, jump, re.IGNORECASE) if jump_match: jump_to_use = jump begin_node = jump_match.groupdict()["begin_node"] self.move_to_node(begin_node) return self.__dict__[jump_to_use](self.data, **options)
def move_to(
self, path, **options)
Changes the state of the graph by moving along the path specified
def move_to(self, path, **options): """Changes the state of the graph by moving along the path specified""" defaults = {"debug": False, "verbose": False} move_options = {} for key, value in defaults.items(): move_options[key] = value for key, value in options.items(): move_options[key] = value if move_options["debug"]: print(path) for index, edge in enumerate(path): # print edge edge_pattern = 'edge_(?P<begin_node>\w+)_(?P<end_node>\w+)_(?P<iterator>\w+)' match = re.match(edge_pattern, edge) begin_node = match.groupdict()['begin_node'] end_node = match.groupdict()['end_node'] if move_options["verbose"]: print(("moving {0} -> {1}".format(begin_node, end_node))) # print self.data self.data = self.__dict__[edge](self.data) # print self.data self.current_node = match.groupdict()['end_node'] self.state = [0 for i in range(len(self.node_names))] position = self.node_names.index(self.current_node) self.state[position] = 1 self.state_matrix = np.matrix(self.state).T
def move_to_node(
self, node)
Inheritance:
Graph.move_to_node
Moves from current_node to the specified node
def move_to_node(self, node): """Moves from current_node to the specified node""" path = self.get_path(self.current_node, node) self.move_to(path)
def path_length(
self, path, num_repeats=10)
Inheritance:
Graph.path_length
Determines the length of a given path, currently the metric is based on the time to move to.
def path_length(self, path, num_repeats=10): """Determines the length of a given path, currently the metric is based on the time to move to.""" begin_time = datetime.datetime.now() # num_repeats=100 for i in range(num_repeats): self.virtual_move_to(path) end_time = datetime.datetime.now() delta_t = end_time - begin_time path_length = delta_t.total_seconds() / float(num_repeats) if path_length == 0.0: print("Warning the path length is less than 1 microsecond," "make sure num_repeats is high enough to measure it.") return path_length
def set_state(
self, node_name, node_data)
Sets the graph state to be the state specified by node_name, and node_data
def set_state(self, node_name, node_data): """Sets the graph state to be the state specified by node_name, and node_data""" try: current_node_state_position = self.node_names.index(node_name) self.current_node = node_name self.data = node_data self.state = [0 for i in range(len(self.node_names))] self.state[current_node_state_position] = 1 self.state_matrix = np.matrix(self.state).T except: print(("Could not set the state of graph: {0}".format(self.graph_name))) raise
def show(
self, **options)
Shows the graph using matplotlib and networkx
def show(self, **options): """Shows the graph using matplotlib and networkx""" # Should be seperated to allow for fixed presentation? defaults = {"descriptions": False, "edge_descriptions": False, "save_plot": False, "path": None, "active_node": True, "directory": None, "specific_descriptor": self.graph_name.replace(" ", "_"), "general_descriptor": "plot", "file_name": None, "arrows": True, "node_size": 1000, "font_size": 10, "fix_layout": True} show_options = {} for key, value in defaults.items(): show_options[key] = value for key, value in options.items(): show_options[key] = value if show_options["directory"] is None: show_options["directory"] = os.getcwd() if show_options["active_node"]: node_colors = [] for node in self.display_graph.nodes(): if node == self.current_node: node_colors.append('b') else: if node in self.node_names: node_colors.append('r') elif node in self.external_node_names: node_colors.append('g') else: node_colors = ['r' for node in self.node_names] + ['g' for node in self.node_names] # print("{0} is {1}".format('node_colors',node_colors)) if show_options["descriptions"]: node_labels = {node: self.node_descriptions[index] for index, node in enumerate(self.node_names)} if self.external_node_names: for index, node in enumerate(self.external_node_names): node_labels[node] = self.external_node_descriptions[index] networkx.draw_networkx(self.display_graph, arrows=show_options["arrows"], labels=node_labels, node_color=node_colors, node_size=show_options["node_size"], font_size=show_options["font_size"], pos=self.display_layout) # print("{0} is {1}".format('node_labels',node_labels)) else: networkx.draw_networkx(self.display_graph, arrows=show_options["arrows"], node_color=node_colors, node_size=show_options["node_size"], font_size=show_options["font_size"], pos=self.display_layout) plt.axis('off') plt.suptitle(self.options["graph_name"]) if show_options["file_name"] is None: file_name = auto_name(specific_descriptor=show_options["specific_descriptor"], general_descriptor=show_options["general_descriptor"], directory=show_options["directory"], extension='png', padding=3) else: file_name = show_options["file_name"] if show_options["save_plot"]: # print file_name if show_options["path"]: plt.savefig(show_options["path"]) else: plt.savefig(os.path.join(show_options["directory"], file_name)) else: plt.show() fig = plt.gcf() return fig
def virtual_move_to(
self, path)
Inheritance:
Graph.virtual_move_to
virtual_move_to simulates moving but does not change the state of the graph
def virtual_move_to(self, path): """virtual_move_to simulates moving but does not change the state of the graph""" # print path temp_state = self.state temp_data = self.data temp_current_node = self.current_node temp_node_names = self.node_names for index, edge in enumerate(path): # print edge edge_pattern = 'edge_(?P<begin_node>\w+)_(?P<end_node>\w+)_(?P<iterator>\w+)' match = re.match(edge_pattern, edge) begin_node = match.groupdict()['begin_node'] end_node = match.groupdict()['end_node'] # print("moving {0} -> {1}".format(begin_node,end_node)) # print self.data temp_data = self.__dict__[edge](temp_data) # print self.data temp_current_node = match.groupdict()['end_node'] temp_state = [0 for i in range(len(temp_node_names))] position = temp_node_names.index(temp_current_node) temp_state[position] = 1
class MetadataGraph
Metadata Graph is a graph representing the content of key,value metadata
class MetadataGraph(Graph): """Metadata Graph is a graph representing the content of key,value metadata""" def __init__(self,**options): """Intializes the metadata graph class""" defaults={"graph_name":"Metadata Graph", "node_names":['Dictionary','JsonString'], "node_descriptions":["Python Dictionary","Json string"], "current_node":'Dictionary', "state":[1,0], "data":{"a":"First","b":"Second"}, "edge_2_to_1":JsonString_to_Dictionary, "edge_1_to_2":Dictionary_to_JsonString} self.options={} for key,value in defaults.items(): self.options[key]=value for key,value in options.items(): self.options[key]=value Graph.__init__(self,**self.options) self.add_node("JsonFile","JsonString",JsonString_to_JsonFile, "JsonString",JsonFile_to_JsonString,node_description="JSON File") self.add_node("XmlString","Dictionary",Dictionary_to_XmlString, "Dictionary",XmlString_to_Dictionary,node_description="XML string") self.add_node("HtmlMetaString","Dictionary",Dictionary_to_HtmlMetaString, "Dictionary",HtmlMetaString_to_Dictionary,node_description="HTML meta tags") self.add_node("XmlTupleString","Dictionary",Dictionary_to_XmlTupleString, "Dictionary",XmlTupleString_to_Dictionary,node_description="Tuple Line") self.add_node("PickleFile","Dictionary",Dictionary_to_PickleFile, "Dictionary",PickleFile_to_Dictionary,node_description="Pickled File") self.add_node("ListList","Dictionary",Dictionary_to_ListList, "Dictionary",ListList_to_Dictionary,node_description="List of lists") self.add_node("HeaderList","Dictionary",Dictionary_to_HeaderList, "Dictionary",HeaderList_to_Dictionary,node_description="Header List") self.add_node("DataFrame","Dictionary",Dictionary_to_DataFrame, "Dictionary",DataFrame_to_Dictionary,node_description="Pandas DataFrame") self.add_node("AsciiDataTable","DataFrame",DataFrame_to_AsciiDataTable, "DataFrame",AsciiDataTable_to_DataFrame,node_description="AsciiDataTable") self.add_node("MatFile","AsciiDataTable",AsciiTable_to_MatFile, "AsciiDataTable",MatFile_to_AsciiDataTableKeyValue,node_description="Matlab") self.add_node("ExcelFile","DataFrame",DataFrame_to_ExcelFile, "DataFrame",ExcelFile_to_DataFrame,node_description="excel") self.add_node("HdfFile","DataFrame",DataFrame_to_HdfFile, "DataFrame",HdfFile_to_DataFrame,node_description="hdf file") self.add_node("CsvFile","DataFrame",DataFrame_to_CsvFile, "DataFrame",CsvFile_to_DataFrame,node_description="CSV File") self.add_node("HtmlFile","DataFrame",DataFrame_to_HtmlFile, "DataFrame",HtmlFile_to_DataFrame,node_description="HTML Table File") self.add_node("HtmlTableString","HtmlFile",HtmlFile_to_HtmlString, "HtmlFile",HtmlString_to_HtmlFile,node_description="HTML Table String")
Ancestors (in MRO)
- MetadataGraph
- Graph
- __builtin__.object
Instance variables
Methods
def __init__(
self, **options)
Intializes the metadata graph class
def __init__(self,**options): """Intializes the metadata graph class""" defaults={"graph_name":"Metadata Graph", "node_names":['Dictionary','JsonString'], "node_descriptions":["Python Dictionary","Json string"], "current_node":'Dictionary', "state":[1,0], "data":{"a":"First","b":"Second"}, "edge_2_to_1":JsonString_to_Dictionary, "edge_1_to_2":Dictionary_to_JsonString} self.options={} for key,value in defaults.items(): self.options[key]=value for key,value in options.items(): self.options[key]=value Graph.__init__(self,**self.options) self.add_node("JsonFile","JsonString",JsonString_to_JsonFile, "JsonString",JsonFile_to_JsonString,node_description="JSON File") self.add_node("XmlString","Dictionary",Dictionary_to_XmlString, "Dictionary",XmlString_to_Dictionary,node_description="XML string") self.add_node("HtmlMetaString","Dictionary",Dictionary_to_HtmlMetaString, "Dictionary",HtmlMetaString_to_Dictionary,node_description="HTML meta tags") self.add_node("XmlTupleString","Dictionary",Dictionary_to_XmlTupleString, "Dictionary",XmlTupleString_to_Dictionary,node_description="Tuple Line") self.add_node("PickleFile","Dictionary",Dictionary_to_PickleFile, "Dictionary",PickleFile_to_Dictionary,node_description="Pickled File") self.add_node("ListList","Dictionary",Dictionary_to_ListList, "Dictionary",ListList_to_Dictionary,node_description="List of lists") self.add_node("HeaderList","Dictionary",Dictionary_to_HeaderList, "Dictionary",HeaderList_to_Dictionary,node_description="Header List") self.add_node("DataFrame","Dictionary",Dictionary_to_DataFrame, "Dictionary",DataFrame_to_Dictionary,node_description="Pandas DataFrame") self.add_node("AsciiDataTable","DataFrame",DataFrame_to_AsciiDataTable, "DataFrame",AsciiDataTable_to_DataFrame,node_description="AsciiDataTable") self.add_node("MatFile","AsciiDataTable",AsciiTable_to_MatFile, "AsciiDataTable",MatFile_to_AsciiDataTableKeyValue,node_description="Matlab") self.add_node("ExcelFile","DataFrame",DataFrame_to_ExcelFile, "DataFrame",ExcelFile_to_DataFrame,node_description="excel") self.add_node("HdfFile","DataFrame",DataFrame_to_HdfFile, "DataFrame",HdfFile_to_DataFrame,node_description="hdf file") self.add_node("CsvFile","DataFrame",DataFrame_to_CsvFile, "DataFrame",CsvFile_to_DataFrame,node_description="CSV File") self.add_node("HtmlFile","DataFrame",DataFrame_to_HtmlFile, "DataFrame",HtmlFile_to_DataFrame,node_description="HTML Table File") self.add_node("HtmlTableString","HtmlFile",HtmlFile_to_HtmlString, "HtmlFile",HtmlString_to_HtmlFile,node_description="HTML Table String")
def add_edge(
self, begin_node=None, end_node=None, edge_function=None)
Adds an edge mapping one node to another, required input is begin_node (it's name) end_node, and the edge function
def add_edge(self, begin_node=None, end_node=None, edge_function=None): """Adds an edge mapping one node to another, required input is begin_node (it's name) end_node, and the edge function""" # check to see if edge is defined if it is increment a number edge_match = re.compile("edge_{0}_{1}".format(begin_node, end_node)) keys = list(self.__dict__.keys()) # print keys iterator = 0 for key in keys: if re.match(edge_match, key): iterator += 1 edge_name = "edge_{0}_{1}_{2:0>3d}".format(begin_node, end_node, iterator) self.__dict__[edge_name] = edge_function self.edges.append(edge_name) edge_matrix = np.zeros((len(self.state), len(self.state))) begin_position = self.node_names.index(begin_node) end_position = self.node_names.index(end_node) edge_matrix[end_position][begin_position] = 1 edge_matrix = np.matrix(edge_matrix) self.edge_matrices.append(edge_matrix) self.display_graph.add_edge(begin_node, end_node) self.display_layout = networkx.spring_layout(self.display_graph)
def add_external_node(
self, external_node_name, jump_into_node_begin, jump_into_node_function, external_node_description=None)
Inheritance:
Graph.add_external_node
Adds an external node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new external node
def add_external_node(self, external_node_name, jump_into_node_begin, jump_into_node_function, external_node_description=None): """Adds an external node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new external node""" # first check if node into and out of node is good self.external_node_names.append(external_node_name) self.add_jump(begin_node=jump_into_node_begin, end_node=external_node_name, jump_function=jump_into_node_function) if external_node_description: self.external_node_descriptions.append(external_node_description) self.display_graph.add_node(external_node_name) self.display_graph.add_edge(jump_into_node_begin, external_node_name) self.display_layout = networkx.spring_layout(self.display_graph)
def add_jump(
self, begin_node=None, end_node=None, jump_function=None)
Adds a jump mapping one internal node to an external node, required input is begin_node (it's name) end_node, and the edge function
def add_jump(self, begin_node=None, end_node=None, jump_function=None): """Adds a jump mapping one internal node to an external node, required input is begin_node (it's name) end_node, and the edge function""" # check to see if edge is defined if it is increment a number jump_match = re.compile("jump_{0}_{1}".format(begin_node, end_node)) keys = list(self.__dict__.keys()) # print keys iterator = 0 for key in keys: if re.match(jump_match, key): iterator += 1 jump_name = "jump_{0}_{1}_{2:0>3d}".format(begin_node, end_node, iterator) self.__dict__[jump_name] = jump_function self.jumps.append(jump_name) self.display_graph.add_edge(begin_node, end_node) self.display_layout = networkx.spring_layout(self.display_graph)
def add_node(
self, node_name, edge_into_node_begin, edge_into_node_function, edge_out_node_end, edge_out_node_function, node_description=None)
Adds a node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new node, a reference to an exiting node and the function mapping the new node to the exiting node.
def add_node(self, node_name, edge_into_node_begin, edge_into_node_function, edge_out_node_end, edge_out_node_function, node_description=None): """Adds a node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new node, a reference to an exiting node and the function mapping the new node to the exiting node.""" # first check if node into and out of node is good self.node_names.append(node_name) self.state.append(0) self.state_matrix = np.matrix(self.state).T for index, matrix in enumerate(self.edge_matrices): pad_row = np.zeros((1, len(matrix))) new_matrix = np.concatenate((matrix, pad_row), axis=0) pad_column = np.zeros((1, len(self.node_names))) new_matrix = np.concatenate((new_matrix, pad_column.T), axis=1) # print("New matrix is :\n{0}".format(new_matrix)) self.edge_matrices[index] = new_matrix self.add_edge(begin_node=node_name, end_node=edge_out_node_end, edge_function=edge_out_node_function) self.add_edge(begin_node=edge_into_node_begin, end_node=node_name, edge_function=edge_into_node_function) if node_description: self.node_descriptions.append(node_description) self.display_graph.add_node(node_name) self.display_graph.add_edge(node_name, edge_out_node_end) self.display_graph.add_edge(edge_into_node_begin, node_name) self.display_layout = networkx.spring_layout(self.display_graph)
def check_closed_path(
self)
Inheritance:
Graph.check_closed_path
Checks that data is not changed for the first closed path found. Returns True if data==data after moving around the closed path, False otherwise. Starting point is current_node
def check_closed_path(self): """Checks that data is not changed for the first closed path found. Returns True if data==data after moving around the closed path, False otherwise. Starting point is current_node """ temp_data = self.data path = self.get_path(self.current_node, self.current_node) if self.is_path_valid(path): pass else: print("Path is not valid, graph definition is broken") raise out = temp_data == self.data out_list = [self.current_node, path, out] print(("The assertion that the data remains unchanged,\n" "for node {0} following path {1} is {2}".format(*out_list))) return out
def get_description_dictionary(
self)
Inheritance:
Graph.get_description_dictionary
returns a dictionary of the form {NodeName:Node Description for all of the current nodes
def get_description_dictionary(self): "returns a dictionary of the form {NodeName:Node Description for all of the current nodes" dictionary = {node_name: self.node_descriptions[index] for index, node_name in enumerate(self.node_names)} return dictionary
def get_entering_edges(
self, node)
Inheritance:
Graph.get_entering_edges
Returns all edges that enter the specificed node
def get_entering_edges(self, node): """Returns all edges that enter the specificed node""" enter_edge_pattern = re.compile('edge_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(node)) enter_edges = [] for index, edge in enumerate(self.edges): if re.match(enter_edge_pattern, edge): enter_edges.append(edge) return enter_edges
def get_entering_nodes(
self, node)
Inheritance:
Graph.get_entering_nodes
Returns all nodes that have an edge that enter the specificed node
def get_entering_nodes(self, node): """Returns all nodes that have an edge that enter the specificed node""" enter_edge_pattern = re.compile('edge_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(node)) enter_nodes = [] for index, edge in enumerate(self.edges): enter_match = re.match(enter_edge_pattern, edge) if enter_match: enter_node = enter_match.groupdict()['begin_node'] enter_nodes.append(enter_node) return enter_nodes
def get_exiting_edges(
self, node)
Inheritance:
Graph.get_exiting_edges
Returns all edges that exit the specificed node
def get_exiting_edges(self, node): """Returns all edges that exit the specificed node""" exit_edge_pattern = re.compile('edge_{0}_(?P<end_node>\w+)_(?P<iterator>\w+)'.format(node)) exit_edges = [] for index, edge in enumerate(self.edges): if re.match(exit_edge_pattern, edge): exit_edges.append(edge) return exit_edges
def get_exiting_nodes(
self, node)
Inheritance:
Graph.get_exiting_nodes
Returns all nodes that have an edge leaving the specificed node
def get_exiting_nodes(self, node): """Returns all nodes that have an edge leaving the specificed node""" exit_edge_pattern = re.compile('edge_{0}_(?P<end_node>\w+)_(?P<iterator>\w+)'.format(node)) exit_nodes = [] for index, edge in enumerate(self.edges): exit_match = re.match(exit_edge_pattern, edge) if exit_match: exit_node = exit_match.groupdict()['end_node'] exit_nodes.append(exit_node) return exit_nodes
def get_path(
self, first_node, last_node, **options)
Returns the first path found between first node and last node, uses a breadth first search algorithm
def get_path(self, first_node, last_node, **options): """Returns the first path found between first node and last node, uses a breadth first search algorithm""" defaults = {"debug": False, "method": "BreathFirst"} self.get_path_options = {} for key, value in defaults.items(): self.get_path_options[key] = value for key, value in options.items(): self.get_path_options[key] = value unvisited_nodes = self.node_names[:] unvisited_nodes.remove(first_node) visited_nodes = [first_node] node_history = [] edge_history = [] path_queue = [] possible_paths = [] queue = [] current_edge = [] queue.append(first_node) path = {first_node: []} while queue: # first remove the current_node = queue.pop(0) if path_queue != []: current_edge = path_queue.pop(0) edge_history.append(current_edge) node_history.append(current_node) if self.get_path_options["debug"]: print(("current_node is {0}".format(current_node))) print(("current_edge is {0}".format(current_edge))) # if this node is the destination exit returning the path if current_node == last_node: if self.get_path_options["debug"]: print(("Node path was found to be {0}".format(node_path))) print(("path was found to be {0}".format(edge_path))) print(("{0} is {1}".format("path", path))) return path[last_node][::-1] adjacent_nodes = self.get_exiting_nodes(current_node) adjacent_paths = self.get_exiting_edges(current_node) if self.get_path_options["debug"]: print(("{0} are {1}".format("adjacent_nodes", adjacent_nodes))) print(("{0} are {1}".format("adjacent_paths", adjacent_paths))) current_history = edge_history for node_index, node in enumerate(adjacent_nodes): if node not in visited_nodes: queue.append(node) path_queue.append(adjacent_paths[node_index]) visited_nodes.append(node) path[node] = [adjacent_paths[node_index]] + path[current_node] path[node] # possible_paths.append(current_path.append(node)) if self.get_path_options["debug"]: print(("{0} is {1}".format("path_queue", path_queue)))
def is_graph_isomorphic(
self)
Inheritance:
Graph.is_graph_isomorphic
Returns True if all nodes have closed paths that preserve the data, False otherwise
def is_graph_isomorphic(self): """Returns True if all nodes have closed paths that preserve the data, False otherwise""" out = True for node in self.node_names: self.move_to_node(node) if not self.check_closed_path: out = False return out
def is_path_valid(
self, path)
Inheritance:
Graph.is_path_valid
Returns True if the path is valid from the current node position or False otherwise
def is_path_valid(self, path): """Returns True if the path is valid from the current node position or False otherwise""" null_state = [0 for i in range(len(self.node_names))] null_state_matrix = np.matrix(null_state).T new_state = np.matrix(self.state).T for index, edge in enumerate(path): # print index # print edge edge_position = self.edges.index(edge) move_matrix = self.edge_matrices[edge_position] # print move_matrix new_state = move_matrix * new_state if new_state.any() == null_state_matrix.any(): # print new_state # print null_state_matrix return False return True
def jump_to_external_node(
self, external_node_name, **options)
Inheritance:
Graph.jump_to_external_node
Returns the result of the jump, the graph is left in the node that is the begining of the jump
def jump_to_external_node(self, external_node_name, **options): """Returns the result of the jump, the graph is left in the node that is the begining of the jump""" end_node = external_node_name jump_pattern = 'jump_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(end_node) for jump in self.jumps[:]: jump_match = re.match(jump_pattern, jump, re.IGNORECASE) if jump_match: jump_to_use = jump begin_node = jump_match.groupdict()["begin_node"] self.move_to_node(begin_node) return self.__dict__[jump_to_use](self.data, **options)
def move_to(
self, path, **options)
Changes the state of the graph by moving along the path specified
def move_to(self, path, **options): """Changes the state of the graph by moving along the path specified""" defaults = {"debug": False, "verbose": False} move_options = {} for key, value in defaults.items(): move_options[key] = value for key, value in options.items(): move_options[key] = value if move_options["debug"]: print(path) for index, edge in enumerate(path): # print edge edge_pattern = 'edge_(?P<begin_node>\w+)_(?P<end_node>\w+)_(?P<iterator>\w+)' match = re.match(edge_pattern, edge) begin_node = match.groupdict()['begin_node'] end_node = match.groupdict()['end_node'] if move_options["verbose"]: print(("moving {0} -> {1}".format(begin_node, end_node))) # print self.data self.data = self.__dict__[edge](self.data) # print self.data self.current_node = match.groupdict()['end_node'] self.state = [0 for i in range(len(self.node_names))] position = self.node_names.index(self.current_node) self.state[position] = 1 self.state_matrix = np.matrix(self.state).T
def move_to_node(
self, node)
Inheritance:
Graph.move_to_node
Moves from current_node to the specified node
def move_to_node(self, node): """Moves from current_node to the specified node""" path = self.get_path(self.current_node, node) self.move_to(path)
def path_length(
self, path, num_repeats=10)
Inheritance:
Graph.path_length
Determines the length of a given path, currently the metric is based on the time to move to.
def path_length(self, path, num_repeats=10): """Determines the length of a given path, currently the metric is based on the time to move to.""" begin_time = datetime.datetime.now() # num_repeats=100 for i in range(num_repeats): self.virtual_move_to(path) end_time = datetime.datetime.now() delta_t = end_time - begin_time path_length = delta_t.total_seconds() / float(num_repeats) if path_length == 0.0: print("Warning the path length is less than 1 microsecond," "make sure num_repeats is high enough to measure it.") return path_length
def set_state(
self, node_name, node_data)
Sets the graph state to be the state specified by node_name, and node_data
def set_state(self, node_name, node_data): """Sets the graph state to be the state specified by node_name, and node_data""" try: current_node_state_position = self.node_names.index(node_name) self.current_node = node_name self.data = node_data self.state = [0 for i in range(len(self.node_names))] self.state[current_node_state_position] = 1 self.state_matrix = np.matrix(self.state).T except: print(("Could not set the state of graph: {0}".format(self.graph_name))) raise
def show(
self, **options)
Shows the graph using matplotlib and networkx
def show(self, **options): """Shows the graph using matplotlib and networkx""" # Should be seperated to allow for fixed presentation? defaults = {"descriptions": False, "edge_descriptions": False, "save_plot": False, "path": None, "active_node": True, "directory": None, "specific_descriptor": self.graph_name.replace(" ", "_"), "general_descriptor": "plot", "file_name": None, "arrows": True, "node_size": 1000, "font_size": 10, "fix_layout": True} show_options = {} for key, value in defaults.items(): show_options[key] = value for key, value in options.items(): show_options[key] = value if show_options["directory"] is None: show_options["directory"] = os.getcwd() if show_options["active_node"]: node_colors = [] for node in self.display_graph.nodes(): if node == self.current_node: node_colors.append('b') else: if node in self.node_names: node_colors.append('r') elif node in self.external_node_names: node_colors.append('g') else: node_colors = ['r' for node in self.node_names] + ['g' for node in self.node_names] # print("{0} is {1}".format('node_colors',node_colors)) if show_options["descriptions"]: node_labels = {node: self.node_descriptions[index] for index, node in enumerate(self.node_names)} if self.external_node_names: for index, node in enumerate(self.external_node_names): node_labels[node] = self.external_node_descriptions[index] networkx.draw_networkx(self.display_graph, arrows=show_options["arrows"], labels=node_labels, node_color=node_colors, node_size=show_options["node_size"], font_size=show_options["font_size"], pos=self.display_layout) # print("{0} is {1}".format('node_labels',node_labels)) else: networkx.draw_networkx(self.display_graph, arrows=show_options["arrows"], node_color=node_colors, node_size=show_options["node_size"], font_size=show_options["font_size"], pos=self.display_layout) plt.axis('off') plt.suptitle(self.options["graph_name"]) if show_options["file_name"] is None: file_name = auto_name(specific_descriptor=show_options["specific_descriptor"], general_descriptor=show_options["general_descriptor"], directory=show_options["directory"], extension='png', padding=3) else: file_name = show_options["file_name"] if show_options["save_plot"]: # print file_name if show_options["path"]: plt.savefig(show_options["path"]) else: plt.savefig(os.path.join(show_options["directory"], file_name)) else: plt.show() fig = plt.gcf() return fig
def virtual_move_to(
self, path)
Inheritance:
Graph.virtual_move_to
virtual_move_to simulates moving but does not change the state of the graph
def virtual_move_to(self, path): """virtual_move_to simulates moving but does not change the state of the graph""" # print path temp_state = self.state temp_data = self.data temp_current_node = self.current_node temp_node_names = self.node_names for index, edge in enumerate(path): # print edge edge_pattern = 'edge_(?P<begin_node>\w+)_(?P<end_node>\w+)_(?P<iterator>\w+)' match = re.match(edge_pattern, edge) begin_node = match.groupdict()['begin_node'] end_node = match.groupdict()['end_node'] # print("moving {0} -> {1}".format(begin_node,end_node)) # print self.data temp_data = self.__dict__[edge](temp_data) # print self.data temp_current_node = match.groupdict()['end_node'] temp_state = [0 for i in range(len(temp_node_names))] position = temp_node_names.index(temp_current_node) temp_state[position] = 1
class StringGraph
String Graph is a graph relating different string forms
class StringGraph(Graph): """String Graph is a graph relating different string forms""" def __init__(self,**options): """Intializes the StringGraph Class by defining nodes and edges""" defaults={"graph_name":"StringGraph", "node_names":['String','StringList'], "node_descriptions":["A plain string", "A list of strings with no \\n, created with string.splitlines()"], "current_node":'String', "state":[1,0], "data":"This is a test string\n it has to have multiple lines \n and many characters 34%6\n^", "edge_2_to_1":edge_2_to_1, "edge_1_to_2":edge_1_to_2 } self.options={} for key,value in defaults.items(): self.options[key]=value for key,value in options.items(): self.options[key]=value Graph.__init__(self,**self.options) self.add_node("File","String",String_to_File,"String",File_to_String,node_description="Plain File") self.add_node("CStringIo","String",String_to_CStringIo,"String",CStringIo_to_String,node_description="C File Like Object") self.add_node("StringIo","String",String_to_StringIo,"String",StringIo_to_String,node_description="File Like Object") self.add_edge(begin_node="StringList",end_node="File",edge_function=StringList_to_File)
Ancestors (in MRO)
- StringGraph
- Graph
- __builtin__.object
Instance variables
Methods
def __init__(
self, **options)
Intializes the StringGraph Class by defining nodes and edges
def __init__(self,**options): """Intializes the StringGraph Class by defining nodes and edges""" defaults={"graph_name":"StringGraph", "node_names":['String','StringList'], "node_descriptions":["A plain string", "A list of strings with no \\n, created with string.splitlines()"], "current_node":'String', "state":[1,0], "data":"This is a test string\n it has to have multiple lines \n and many characters 34%6\n^", "edge_2_to_1":edge_2_to_1, "edge_1_to_2":edge_1_to_2 } self.options={} for key,value in defaults.items(): self.options[key]=value for key,value in options.items(): self.options[key]=value Graph.__init__(self,**self.options) self.add_node("File","String",String_to_File,"String",File_to_String,node_description="Plain File") self.add_node("CStringIo","String",String_to_CStringIo,"String",CStringIo_to_String,node_description="C File Like Object") self.add_node("StringIo","String",String_to_StringIo,"String",StringIo_to_String,node_description="File Like Object") self.add_edge(begin_node="StringList",end_node="File",edge_function=StringList_to_File)
def add_edge(
self, begin_node=None, end_node=None, edge_function=None)
Adds an edge mapping one node to another, required input is begin_node (it's name) end_node, and the edge function
def add_edge(self, begin_node=None, end_node=None, edge_function=None): """Adds an edge mapping one node to another, required input is begin_node (it's name) end_node, and the edge function""" # check to see if edge is defined if it is increment a number edge_match = re.compile("edge_{0}_{1}".format(begin_node, end_node)) keys = list(self.__dict__.keys()) # print keys iterator = 0 for key in keys: if re.match(edge_match, key): iterator += 1 edge_name = "edge_{0}_{1}_{2:0>3d}".format(begin_node, end_node, iterator) self.__dict__[edge_name] = edge_function self.edges.append(edge_name) edge_matrix = np.zeros((len(self.state), len(self.state))) begin_position = self.node_names.index(begin_node) end_position = self.node_names.index(end_node) edge_matrix[end_position][begin_position] = 1 edge_matrix = np.matrix(edge_matrix) self.edge_matrices.append(edge_matrix) self.display_graph.add_edge(begin_node, end_node) self.display_layout = networkx.spring_layout(self.display_graph)
def add_external_node(
self, external_node_name, jump_into_node_begin, jump_into_node_function, external_node_description=None)
Inheritance:
Graph.add_external_node
Adds an external node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new external node
def add_external_node(self, external_node_name, jump_into_node_begin, jump_into_node_function, external_node_description=None): """Adds an external node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new external node""" # first check if node into and out of node is good self.external_node_names.append(external_node_name) self.add_jump(begin_node=jump_into_node_begin, end_node=external_node_name, jump_function=jump_into_node_function) if external_node_description: self.external_node_descriptions.append(external_node_description) self.display_graph.add_node(external_node_name) self.display_graph.add_edge(jump_into_node_begin, external_node_name) self.display_layout = networkx.spring_layout(self.display_graph)
def add_jump(
self, begin_node=None, end_node=None, jump_function=None)
Adds a jump mapping one internal node to an external node, required input is begin_node (it's name) end_node, and the edge function
def add_jump(self, begin_node=None, end_node=None, jump_function=None): """Adds a jump mapping one internal node to an external node, required input is begin_node (it's name) end_node, and the edge function""" # check to see if edge is defined if it is increment a number jump_match = re.compile("jump_{0}_{1}".format(begin_node, end_node)) keys = list(self.__dict__.keys()) # print keys iterator = 0 for key in keys: if re.match(jump_match, key): iterator += 1 jump_name = "jump_{0}_{1}_{2:0>3d}".format(begin_node, end_node, iterator) self.__dict__[jump_name] = jump_function self.jumps.append(jump_name) self.display_graph.add_edge(begin_node, end_node) self.display_layout = networkx.spring_layout(self.display_graph)
def add_node(
self, node_name, edge_into_node_begin, edge_into_node_function, edge_out_node_end, edge_out_node_function, node_description=None)
Adds a node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new node, a reference to an exiting node and the function mapping the new node to the exiting node.
def add_node(self, node_name, edge_into_node_begin, edge_into_node_function, edge_out_node_end, edge_out_node_function, node_description=None): """Adds a node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new node, a reference to an exiting node and the function mapping the new node to the exiting node.""" # first check if node into and out of node is good self.node_names.append(node_name) self.state.append(0) self.state_matrix = np.matrix(self.state).T for index, matrix in enumerate(self.edge_matrices): pad_row = np.zeros((1, len(matrix))) new_matrix = np.concatenate((matrix, pad_row), axis=0) pad_column = np.zeros((1, len(self.node_names))) new_matrix = np.concatenate((new_matrix, pad_column.T), axis=1) # print("New matrix is :\n{0}".format(new_matrix)) self.edge_matrices[index] = new_matrix self.add_edge(begin_node=node_name, end_node=edge_out_node_end, edge_function=edge_out_node_function) self.add_edge(begin_node=edge_into_node_begin, end_node=node_name, edge_function=edge_into_node_function) if node_description: self.node_descriptions.append(node_description) self.display_graph.add_node(node_name) self.display_graph.add_edge(node_name, edge_out_node_end) self.display_graph.add_edge(edge_into_node_begin, node_name) self.display_layout = networkx.spring_layout(self.display_graph)
def check_closed_path(
self)
Inheritance:
Graph.check_closed_path
Checks that data is not changed for the first closed path found. Returns True if data==data after moving around the closed path, False otherwise. Starting point is current_node
def check_closed_path(self): """Checks that data is not changed for the first closed path found. Returns True if data==data after moving around the closed path, False otherwise. Starting point is current_node """ temp_data = self.data path = self.get_path(self.current_node, self.current_node) if self.is_path_valid(path): pass else: print("Path is not valid, graph definition is broken") raise out = temp_data == self.data out_list = [self.current_node, path, out] print(("The assertion that the data remains unchanged,\n" "for node {0} following path {1} is {2}".format(*out_list))) return out
def get_description_dictionary(
self)
Inheritance:
Graph.get_description_dictionary
returns a dictionary of the form {NodeName:Node Description for all of the current nodes
def get_description_dictionary(self): "returns a dictionary of the form {NodeName:Node Description for all of the current nodes" dictionary = {node_name: self.node_descriptions[index] for index, node_name in enumerate(self.node_names)} return dictionary
def get_entering_edges(
self, node)
Inheritance:
Graph.get_entering_edges
Returns all edges that enter the specificed node
def get_entering_edges(self, node): """Returns all edges that enter the specificed node""" enter_edge_pattern = re.compile('edge_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(node)) enter_edges = [] for index, edge in enumerate(self.edges): if re.match(enter_edge_pattern, edge): enter_edges.append(edge) return enter_edges
def get_entering_nodes(
self, node)
Inheritance:
Graph.get_entering_nodes
Returns all nodes that have an edge that enter the specificed node
def get_entering_nodes(self, node): """Returns all nodes that have an edge that enter the specificed node""" enter_edge_pattern = re.compile('edge_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(node)) enter_nodes = [] for index, edge in enumerate(self.edges): enter_match = re.match(enter_edge_pattern, edge) if enter_match: enter_node = enter_match.groupdict()['begin_node'] enter_nodes.append(enter_node) return enter_nodes
def get_exiting_edges(
self, node)
Inheritance:
Graph.get_exiting_edges
Returns all edges that exit the specificed node
def get_exiting_edges(self, node): """Returns all edges that exit the specificed node""" exit_edge_pattern = re.compile('edge_{0}_(?P<end_node>\w+)_(?P<iterator>\w+)'.format(node)) exit_edges = [] for index, edge in enumerate(self.edges): if re.match(exit_edge_pattern, edge): exit_edges.append(edge) return exit_edges
def get_exiting_nodes(
self, node)
Inheritance:
Graph.get_exiting_nodes
Returns all nodes that have an edge leaving the specificed node
def get_exiting_nodes(self, node): """Returns all nodes that have an edge leaving the specificed node""" exit_edge_pattern = re.compile('edge_{0}_(?P<end_node>\w+)_(?P<iterator>\w+)'.format(node)) exit_nodes = [] for index, edge in enumerate(self.edges): exit_match = re.match(exit_edge_pattern, edge) if exit_match: exit_node = exit_match.groupdict()['end_node'] exit_nodes.append(exit_node) return exit_nodes
def get_path(
self, first_node, last_node, **options)
Returns the first path found between first node and last node, uses a breadth first search algorithm
def get_path(self, first_node, last_node, **options): """Returns the first path found between first node and last node, uses a breadth first search algorithm""" defaults = {"debug": False, "method": "BreathFirst"} self.get_path_options = {} for key, value in defaults.items(): self.get_path_options[key] = value for key, value in options.items(): self.get_path_options[key] = value unvisited_nodes = self.node_names[:] unvisited_nodes.remove(first_node) visited_nodes = [first_node] node_history = [] edge_history = [] path_queue = [] possible_paths = [] queue = [] current_edge = [] queue.append(first_node) path = {first_node: []} while queue: # first remove the current_node = queue.pop(0) if path_queue != []: current_edge = path_queue.pop(0) edge_history.append(current_edge) node_history.append(current_node) if self.get_path_options["debug"]: print(("current_node is {0}".format(current_node))) print(("current_edge is {0}".format(current_edge))) # if this node is the destination exit returning the path if current_node == last_node: if self.get_path_options["debug"]: print(("Node path was found to be {0}".format(node_path))) print(("path was found to be {0}".format(edge_path))) print(("{0} is {1}".format("path", path))) return path[last_node][::-1] adjacent_nodes = self.get_exiting_nodes(current_node) adjacent_paths = self.get_exiting_edges(current_node) if self.get_path_options["debug"]: print(("{0} are {1}".format("adjacent_nodes", adjacent_nodes))) print(("{0} are {1}".format("adjacent_paths", adjacent_paths))) current_history = edge_history for node_index, node in enumerate(adjacent_nodes): if node not in visited_nodes: queue.append(node) path_queue.append(adjacent_paths[node_index]) visited_nodes.append(node) path[node] = [adjacent_paths[node_index]] + path[current_node] path[node] # possible_paths.append(current_path.append(node)) if self.get_path_options["debug"]: print(("{0} is {1}".format("path_queue", path_queue)))
def is_graph_isomorphic(
self)
Inheritance:
Graph.is_graph_isomorphic
Returns True if all nodes have closed paths that preserve the data, False otherwise
def is_graph_isomorphic(self): """Returns True if all nodes have closed paths that preserve the data, False otherwise""" out = True for node in self.node_names: self.move_to_node(node) if not self.check_closed_path: out = False return out
def is_path_valid(
self, path)
Inheritance:
Graph.is_path_valid
Returns True if the path is valid from the current node position or False otherwise
def is_path_valid(self, path): """Returns True if the path is valid from the current node position or False otherwise""" null_state = [0 for i in range(len(self.node_names))] null_state_matrix = np.matrix(null_state).T new_state = np.matrix(self.state).T for index, edge in enumerate(path): # print index # print edge edge_position = self.edges.index(edge) move_matrix = self.edge_matrices[edge_position] # print move_matrix new_state = move_matrix * new_state if new_state.any() == null_state_matrix.any(): # print new_state # print null_state_matrix return False return True
def jump_to_external_node(
self, external_node_name, **options)
Inheritance:
Graph.jump_to_external_node
Returns the result of the jump, the graph is left in the node that is the begining of the jump
def jump_to_external_node(self, external_node_name, **options): """Returns the result of the jump, the graph is left in the node that is the begining of the jump""" end_node = external_node_name jump_pattern = 'jump_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(end_node) for jump in self.jumps[:]: jump_match = re.match(jump_pattern, jump, re.IGNORECASE) if jump_match: jump_to_use = jump begin_node = jump_match.groupdict()["begin_node"] self.move_to_node(begin_node) return self.__dict__[jump_to_use](self.data, **options)
def move_to(
self, path, **options)
Changes the state of the graph by moving along the path specified
def move_to(self, path, **options): """Changes the state of the graph by moving along the path specified""" defaults = {"debug": False, "verbose": False} move_options = {} for key, value in defaults.items(): move_options[key] = value for key, value in options.items(): move_options[key] = value if move_options["debug"]: print(path) for index, edge in enumerate(path): # print edge edge_pattern = 'edge_(?P<begin_node>\w+)_(?P<end_node>\w+)_(?P<iterator>\w+)' match = re.match(edge_pattern, edge) begin_node = match.groupdict()['begin_node'] end_node = match.groupdict()['end_node'] if move_options["verbose"]: print(("moving {0} -> {1}".format(begin_node, end_node))) # print self.data self.data = self.__dict__[edge](self.data) # print self.data self.current_node = match.groupdict()['end_node'] self.state = [0 for i in range(len(self.node_names))] position = self.node_names.index(self.current_node) self.state[position] = 1 self.state_matrix = np.matrix(self.state).T
def move_to_node(
self, node)
Inheritance:
Graph.move_to_node
Moves from current_node to the specified node
def move_to_node(self, node): """Moves from current_node to the specified node""" path = self.get_path(self.current_node, node) self.move_to(path)
def path_length(
self, path, num_repeats=10)
Inheritance:
Graph.path_length
Determines the length of a given path, currently the metric is based on the time to move to.
def path_length(self, path, num_repeats=10): """Determines the length of a given path, currently the metric is based on the time to move to.""" begin_time = datetime.datetime.now() # num_repeats=100 for i in range(num_repeats): self.virtual_move_to(path) end_time = datetime.datetime.now() delta_t = end_time - begin_time path_length = delta_t.total_seconds() / float(num_repeats) if path_length == 0.0: print("Warning the path length is less than 1 microsecond," "make sure num_repeats is high enough to measure it.") return path_length
def set_state(
self, node_name, node_data)
Sets the graph state to be the state specified by node_name, and node_data
def set_state(self, node_name, node_data): """Sets the graph state to be the state specified by node_name, and node_data""" try: current_node_state_position = self.node_names.index(node_name) self.current_node = node_name self.data = node_data self.state = [0 for i in range(len(self.node_names))] self.state[current_node_state_position] = 1 self.state_matrix = np.matrix(self.state).T except: print(("Could not set the state of graph: {0}".format(self.graph_name))) raise
def show(
self, **options)
Shows the graph using matplotlib and networkx
def show(self, **options): """Shows the graph using matplotlib and networkx""" # Should be seperated to allow for fixed presentation? defaults = {"descriptions": False, "edge_descriptions": False, "save_plot": False, "path": None, "active_node": True, "directory": None, "specific_descriptor": self.graph_name.replace(" ", "_"), "general_descriptor": "plot", "file_name": None, "arrows": True, "node_size": 1000, "font_size": 10, "fix_layout": True} show_options = {} for key, value in defaults.items(): show_options[key] = value for key, value in options.items(): show_options[key] = value if show_options["directory"] is None: show_options["directory"] = os.getcwd() if show_options["active_node"]: node_colors = [] for node in self.display_graph.nodes(): if node == self.current_node: node_colors.append('b') else: if node in self.node_names: node_colors.append('r') elif node in self.external_node_names: node_colors.append('g') else: node_colors = ['r' for node in self.node_names] + ['g' for node in self.node_names] # print("{0} is {1}".format('node_colors',node_colors)) if show_options["descriptions"]: node_labels = {node: self.node_descriptions[index] for index, node in enumerate(self.node_names)} if self.external_node_names: for index, node in enumerate(self.external_node_names): node_labels[node] = self.external_node_descriptions[index] networkx.draw_networkx(self.display_graph, arrows=show_options["arrows"], labels=node_labels, node_color=node_colors, node_size=show_options["node_size"], font_size=show_options["font_size"], pos=self.display_layout) # print("{0} is {1}".format('node_labels',node_labels)) else: networkx.draw_networkx(self.display_graph, arrows=show_options["arrows"], node_color=node_colors, node_size=show_options["node_size"], font_size=show_options["font_size"], pos=self.display_layout) plt.axis('off') plt.suptitle(self.options["graph_name"]) if show_options["file_name"] is None: file_name = auto_name(specific_descriptor=show_options["specific_descriptor"], general_descriptor=show_options["general_descriptor"], directory=show_options["directory"], extension='png', padding=3) else: file_name = show_options["file_name"] if show_options["save_plot"]: # print file_name if show_options["path"]: plt.savefig(show_options["path"]) else: plt.savefig(os.path.join(show_options["directory"], file_name)) else: plt.show() fig = plt.gcf() return fig
def virtual_move_to(
self, path)
Inheritance:
Graph.virtual_move_to
virtual_move_to simulates moving but does not change the state of the graph
def virtual_move_to(self, path): """virtual_move_to simulates moving but does not change the state of the graph""" # print path temp_state = self.state temp_data = self.data temp_current_node = self.current_node temp_node_names = self.node_names for index, edge in enumerate(path): # print edge edge_pattern = 'edge_(?P<begin_node>\w+)_(?P<end_node>\w+)_(?P<iterator>\w+)' match = re.match(edge_pattern, edge) begin_node = match.groupdict()['begin_node'] end_node = match.groupdict()['end_node'] # print("moving {0} -> {1}".format(begin_node,end_node)) # print self.data temp_data = self.__dict__[edge](temp_data) # print self.data temp_current_node = match.groupdict()['end_node'] temp_state = [0 for i in range(len(temp_node_names))] position = temp_node_names.index(temp_current_node) temp_state[position] = 1
class TableGraph
Class that transforms column modeled data (table) from one format to another, use set_state to initialize to your data. #!python defaults={"graph_name":"Table Graph", "node_names":['DataFrame','AsciiDataTable'], "node_descriptions":["Pandas Data Frame","AsciiDataTable"], "current_node":'DataFrame', "state":[1,0], "data":pandas.DataFrame([[1,2,3],[3,4,5]],columns=["a","b","c"]), "edge_2_to_1":AsciiDataTable_to_DataFrame, "edge_1_to_2":DataFrame_to_AsciiDataTable}
class TableGraph(Graph): """Class that transforms column modeled data (table) from one format to another, use set_state to initialize to your data. #!python defaults={"graph_name":"Table Graph", "node_names":['DataFrame','AsciiDataTable'], "node_descriptions":["Pandas Data Frame","AsciiDataTable"], "current_node":'DataFrame', "state":[1,0], "data":pandas.DataFrame([[1,2,3],[3,4,5]],columns=["a","b","c"]), "edge_2_to_1":AsciiDataTable_to_DataFrame, "edge_1_to_2":DataFrame_to_AsciiDataTable} """ def __init__(self,**options): defaults={"graph_name":"Table Graph", "node_names":['DataFrame','AsciiDataTable'], "node_descriptions":["Pandas Data Frame","AsciiDataTable"], "current_node":'DataFrame', "state":[1,0], "data":pandas.DataFrame([[1,2,3],[3,4,5]],columns=["a","b","c"]), "edge_2_to_1":AsciiDataTable_to_DataFrame, "edge_1_to_2":DataFrame_to_AsciiDataTable} self.options={} for key,value in defaults.items(): self.options[key]=value for key,value in options.items(): self.options[key]=value Graph.__init__(self,**self.options) self.add_node("HdfFile","DataFrame",DataFrame_to_HdfFile, "DataFrame",HdfFile_to_DataFrame, node_description="HDF File") self.add_node("XmlDataTable","AsciiDataTable",AsciiDataTable_to_XmlDataTable, "AsciiDataTable",XmlDataTable_to_AsciiDataTable, node_description="XML Data Table") # Need to add XML File and Html File using save and save_HTML() self.add_node("ExcelFile","DataFrame",DataFrame_to_ExcelFile, "DataFrame",ExcelFile_to_DataFrame, node_description="Excel File") self.add_node("OdsFile","ExcelFile",ExcelFile_to_OdsFile, "ExcelFile",OdsFile_to_ExcelFile,"Open Office Spreadsheet") self.add_node("HtmlString","DataFrame",DataFrame_to_HtmlString, "DataFrame",HtmlString_to_DataFrame, node_description="HTML String") # Note a lot of the pandas reading and writing cause float64 round off errors # applymap(lambda x: np.around(x,10) any all float fields will fix this # also the column names move about in order self.add_node("JsonFile","DataFrame",DataFrame_to_JsonFile, "DataFrame",JsonFile_to_DataFrame, node_description="JSON File") self.add_node("JsonString","DataFrame",DataFrame_to_JsonString, "DataFrame",JsonString_to_DataFrame, node_description="JSON String") self.add_node("CsvFile","DataFrame",DataFrame_to_CsvFile, "DataFrame",CsvFile_to_DataFrame, node_description="CSV File") self.add_node("MatFile","AsciiDataTable",AsciiTable_to_MatFile, "AsciiDataTable",MatFile_to_AsciiTable, node_description="Matlab File") self.add_node("XmlFile","XmlDataTable",XmlDataTable_to_XmlFile, "XmlDataTable",XmlFile_to_XmlDataTable, node_description="XML DataTable Saved As a File") self.add_node("HtmlFile","HtmlString",HtmlString_to_HtmlFile, "HtmlString",HtmlFile_to_HtmlString, node_description="HTML File") self.add_edge("DataFrame","HtmlFile",DataFrame_to_HtmlFile) self.add_edge("JsonFile","XmlDataTable",JsonFile_to_XmlDataTable) self.add_external_node("XsltResultString","XmlDataTable",XmlBase_to_XsltResultString, external_node_description="XSLT Results String") self.add_external_node("XsltResultFile","XmlDataTable",XmlBase_to_XsltResultFile, external_node_description="XSLT Results File")
Ancestors (in MRO)
- TableGraph
- Graph
- __builtin__.object
Instance variables
Methods
def __init__(
self, **options)
Initializes the graph. The first 2 nodes and two edges forming a bijection between them are required
def __init__(self,**options): defaults={"graph_name":"Table Graph", "node_names":['DataFrame','AsciiDataTable'], "node_descriptions":["Pandas Data Frame","AsciiDataTable"], "current_node":'DataFrame', "state":[1,0], "data":pandas.DataFrame([[1,2,3],[3,4,5]],columns=["a","b","c"]), "edge_2_to_1":AsciiDataTable_to_DataFrame, "edge_1_to_2":DataFrame_to_AsciiDataTable} self.options={} for key,value in defaults.items(): self.options[key]=value for key,value in options.items(): self.options[key]=value Graph.__init__(self,**self.options) self.add_node("HdfFile","DataFrame",DataFrame_to_HdfFile, "DataFrame",HdfFile_to_DataFrame, node_description="HDF File") self.add_node("XmlDataTable","AsciiDataTable",AsciiDataTable_to_XmlDataTable, "AsciiDataTable",XmlDataTable_to_AsciiDataTable, node_description="XML Data Table") # Need to add XML File and Html File using save and save_HTML() self.add_node("ExcelFile","DataFrame",DataFrame_to_ExcelFile, "DataFrame",ExcelFile_to_DataFrame, node_description="Excel File") self.add_node("OdsFile","ExcelFile",ExcelFile_to_OdsFile, "ExcelFile",OdsFile_to_ExcelFile,"Open Office Spreadsheet") self.add_node("HtmlString","DataFrame",DataFrame_to_HtmlString, "DataFrame",HtmlString_to_DataFrame, node_description="HTML String") # Note a lot of the pandas reading and writing cause float64 round off errors # applymap(lambda x: np.around(x,10) any all float fields will fix this # also the column names move about in order self.add_node("JsonFile","DataFrame",DataFrame_to_JsonFile, "DataFrame",JsonFile_to_DataFrame, node_description="JSON File") self.add_node("JsonString","DataFrame",DataFrame_to_JsonString, "DataFrame",JsonString_to_DataFrame, node_description="JSON String") self.add_node("CsvFile","DataFrame",DataFrame_to_CsvFile, "DataFrame",CsvFile_to_DataFrame, node_description="CSV File") self.add_node("MatFile","AsciiDataTable",AsciiTable_to_MatFile, "AsciiDataTable",MatFile_to_AsciiTable, node_description="Matlab File") self.add_node("XmlFile","XmlDataTable",XmlDataTable_to_XmlFile, "XmlDataTable",XmlFile_to_XmlDataTable, node_description="XML DataTable Saved As a File") self.add_node("HtmlFile","HtmlString",HtmlString_to_HtmlFile, "HtmlString",HtmlFile_to_HtmlString, node_description="HTML File") self.add_edge("DataFrame","HtmlFile",DataFrame_to_HtmlFile) self.add_edge("JsonFile","XmlDataTable",JsonFile_to_XmlDataTable) self.add_external_node("XsltResultString","XmlDataTable",XmlBase_to_XsltResultString, external_node_description="XSLT Results String") self.add_external_node("XsltResultFile","XmlDataTable",XmlBase_to_XsltResultFile, external_node_description="XSLT Results File")
def add_edge(
self, begin_node=None, end_node=None, edge_function=None)
Adds an edge mapping one node to another, required input is begin_node (it's name) end_node, and the edge function
def add_edge(self, begin_node=None, end_node=None, edge_function=None): """Adds an edge mapping one node to another, required input is begin_node (it's name) end_node, and the edge function""" # check to see if edge is defined if it is increment a number edge_match = re.compile("edge_{0}_{1}".format(begin_node, end_node)) keys = list(self.__dict__.keys()) # print keys iterator = 0 for key in keys: if re.match(edge_match, key): iterator += 1 edge_name = "edge_{0}_{1}_{2:0>3d}".format(begin_node, end_node, iterator) self.__dict__[edge_name] = edge_function self.edges.append(edge_name) edge_matrix = np.zeros((len(self.state), len(self.state))) begin_position = self.node_names.index(begin_node) end_position = self.node_names.index(end_node) edge_matrix[end_position][begin_position] = 1 edge_matrix = np.matrix(edge_matrix) self.edge_matrices.append(edge_matrix) self.display_graph.add_edge(begin_node, end_node) self.display_layout = networkx.spring_layout(self.display_graph)
def add_external_node(
self, external_node_name, jump_into_node_begin, jump_into_node_function, external_node_description=None)
Inheritance:
Graph.add_external_node
Adds an external node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new external node
def add_external_node(self, external_node_name, jump_into_node_begin, jump_into_node_function, external_node_description=None): """Adds an external node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new external node""" # first check if node into and out of node is good self.external_node_names.append(external_node_name) self.add_jump(begin_node=jump_into_node_begin, end_node=external_node_name, jump_function=jump_into_node_function) if external_node_description: self.external_node_descriptions.append(external_node_description) self.display_graph.add_node(external_node_name) self.display_graph.add_edge(jump_into_node_begin, external_node_name) self.display_layout = networkx.spring_layout(self.display_graph)
def add_jump(
self, begin_node=None, end_node=None, jump_function=None)
Adds a jump mapping one internal node to an external node, required input is begin_node (it's name) end_node, and the edge function
def add_jump(self, begin_node=None, end_node=None, jump_function=None): """Adds a jump mapping one internal node to an external node, required input is begin_node (it's name) end_node, and the edge function""" # check to see if edge is defined if it is increment a number jump_match = re.compile("jump_{0}_{1}".format(begin_node, end_node)) keys = list(self.__dict__.keys()) # print keys iterator = 0 for key in keys: if re.match(jump_match, key): iterator += 1 jump_name = "jump_{0}_{1}_{2:0>3d}".format(begin_node, end_node, iterator) self.__dict__[jump_name] = jump_function self.jumps.append(jump_name) self.display_graph.add_edge(begin_node, end_node) self.display_layout = networkx.spring_layout(self.display_graph)
def add_node(
self, node_name, edge_into_node_begin, edge_into_node_function, edge_out_node_end, edge_out_node_function, node_description=None)
Adds a node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new node, a reference to an exiting node and the function mapping the new node to the exiting node.
def add_node(self, node_name, edge_into_node_begin, edge_into_node_function, edge_out_node_end, edge_out_node_function, node_description=None): """Adds a node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new node, a reference to an exiting node and the function mapping the new node to the exiting node.""" # first check if node into and out of node is good self.node_names.append(node_name) self.state.append(0) self.state_matrix = np.matrix(self.state).T for index, matrix in enumerate(self.edge_matrices): pad_row = np.zeros((1, len(matrix))) new_matrix = np.concatenate((matrix, pad_row), axis=0) pad_column = np.zeros((1, len(self.node_names))) new_matrix = np.concatenate((new_matrix, pad_column.T), axis=1) # print("New matrix is :\n{0}".format(new_matrix)) self.edge_matrices[index] = new_matrix self.add_edge(begin_node=node_name, end_node=edge_out_node_end, edge_function=edge_out_node_function) self.add_edge(begin_node=edge_into_node_begin, end_node=node_name, edge_function=edge_into_node_function) if node_description: self.node_descriptions.append(node_description) self.display_graph.add_node(node_name) self.display_graph.add_edge(node_name, edge_out_node_end) self.display_graph.add_edge(edge_into_node_begin, node_name) self.display_layout = networkx.spring_layout(self.display_graph)
def check_closed_path(
self)
Inheritance:
Graph.check_closed_path
Checks that data is not changed for the first closed path found. Returns True if data==data after moving around the closed path, False otherwise. Starting point is current_node
def check_closed_path(self): """Checks that data is not changed for the first closed path found. Returns True if data==data after moving around the closed path, False otherwise. Starting point is current_node """ temp_data = self.data path = self.get_path(self.current_node, self.current_node) if self.is_path_valid(path): pass else: print("Path is not valid, graph definition is broken") raise out = temp_data == self.data out_list = [self.current_node, path, out] print(("The assertion that the data remains unchanged,\n" "for node {0} following path {1} is {2}".format(*out_list))) return out
def get_description_dictionary(
self)
Inheritance:
Graph.get_description_dictionary
returns a dictionary of the form {NodeName:Node Description for all of the current nodes
def get_description_dictionary(self): "returns a dictionary of the form {NodeName:Node Description for all of the current nodes" dictionary = {node_name: self.node_descriptions[index] for index, node_name in enumerate(self.node_names)} return dictionary
def get_entering_edges(
self, node)
Inheritance:
Graph.get_entering_edges
Returns all edges that enter the specificed node
def get_entering_edges(self, node): """Returns all edges that enter the specificed node""" enter_edge_pattern = re.compile('edge_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(node)) enter_edges = [] for index, edge in enumerate(self.edges): if re.match(enter_edge_pattern, edge): enter_edges.append(edge) return enter_edges
def get_entering_nodes(
self, node)
Inheritance:
Graph.get_entering_nodes
Returns all nodes that have an edge that enter the specificed node
def get_entering_nodes(self, node): """Returns all nodes that have an edge that enter the specificed node""" enter_edge_pattern = re.compile('edge_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(node)) enter_nodes = [] for index, edge in enumerate(self.edges): enter_match = re.match(enter_edge_pattern, edge) if enter_match: enter_node = enter_match.groupdict()['begin_node'] enter_nodes.append(enter_node) return enter_nodes
def get_exiting_edges(
self, node)
Inheritance:
Graph.get_exiting_edges
Returns all edges that exit the specificed node
def get_exiting_edges(self, node): """Returns all edges that exit the specificed node""" exit_edge_pattern = re.compile('edge_{0}_(?P<end_node>\w+)_(?P<iterator>\w+)'.format(node)) exit_edges = [] for index, edge in enumerate(self.edges): if re.match(exit_edge_pattern, edge): exit_edges.append(edge) return exit_edges
def get_exiting_nodes(
self, node)
Inheritance:
Graph.get_exiting_nodes
Returns all nodes that have an edge leaving the specificed node
def get_exiting_nodes(self, node): """Returns all nodes that have an edge leaving the specificed node""" exit_edge_pattern = re.compile('edge_{0}_(?P<end_node>\w+)_(?P<iterator>\w+)'.format(node)) exit_nodes = [] for index, edge in enumerate(self.edges): exit_match = re.match(exit_edge_pattern, edge) if exit_match: exit_node = exit_match.groupdict()['end_node'] exit_nodes.append(exit_node) return exit_nodes
def get_path(
self, first_node, last_node, **options)
Returns the first path found between first node and last node, uses a breadth first search algorithm
def get_path(self, first_node, last_node, **options): """Returns the first path found between first node and last node, uses a breadth first search algorithm""" defaults = {"debug": False, "method": "BreathFirst"} self.get_path_options = {} for key, value in defaults.items(): self.get_path_options[key] = value for key, value in options.items(): self.get_path_options[key] = value unvisited_nodes = self.node_names[:] unvisited_nodes.remove(first_node) visited_nodes = [first_node] node_history = [] edge_history = [] path_queue = [] possible_paths = [] queue = [] current_edge = [] queue.append(first_node) path = {first_node: []} while queue: # first remove the current_node = queue.pop(0) if path_queue != []: current_edge = path_queue.pop(0) edge_history.append(current_edge) node_history.append(current_node) if self.get_path_options["debug"]: print(("current_node is {0}".format(current_node))) print(("current_edge is {0}".format(current_edge))) # if this node is the destination exit returning the path if current_node == last_node: if self.get_path_options["debug"]: print(("Node path was found to be {0}".format(node_path))) print(("path was found to be {0}".format(edge_path))) print(("{0} is {1}".format("path", path))) return path[last_node][::-1] adjacent_nodes = self.get_exiting_nodes(current_node) adjacent_paths = self.get_exiting_edges(current_node) if self.get_path_options["debug"]: print(("{0} are {1}".format("adjacent_nodes", adjacent_nodes))) print(("{0} are {1}".format("adjacent_paths", adjacent_paths))) current_history = edge_history for node_index, node in enumerate(adjacent_nodes): if node not in visited_nodes: queue.append(node) path_queue.append(adjacent_paths[node_index]) visited_nodes.append(node) path[node] = [adjacent_paths[node_index]] + path[current_node] path[node] # possible_paths.append(current_path.append(node)) if self.get_path_options["debug"]: print(("{0} is {1}".format("path_queue", path_queue)))
def is_graph_isomorphic(
self)
Inheritance:
Graph.is_graph_isomorphic
Returns True if all nodes have closed paths that preserve the data, False otherwise
def is_graph_isomorphic(self): """Returns True if all nodes have closed paths that preserve the data, False otherwise""" out = True for node in self.node_names: self.move_to_node(node) if not self.check_closed_path: out = False return out
def is_path_valid(
self, path)
Inheritance:
Graph.is_path_valid
Returns True if the path is valid from the current node position or False otherwise
def is_path_valid(self, path): """Returns True if the path is valid from the current node position or False otherwise""" null_state = [0 for i in range(len(self.node_names))] null_state_matrix = np.matrix(null_state).T new_state = np.matrix(self.state).T for index, edge in enumerate(path): # print index # print edge edge_position = self.edges.index(edge) move_matrix = self.edge_matrices[edge_position] # print move_matrix new_state = move_matrix * new_state if new_state.any() == null_state_matrix.any(): # print new_state # print null_state_matrix return False return True
def jump_to_external_node(
self, external_node_name, **options)
Inheritance:
Graph.jump_to_external_node
Returns the result of the jump, the graph is left in the node that is the begining of the jump
def jump_to_external_node(self, external_node_name, **options): """Returns the result of the jump, the graph is left in the node that is the begining of the jump""" end_node = external_node_name jump_pattern = 'jump_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(end_node) for jump in self.jumps[:]: jump_match = re.match(jump_pattern, jump, re.IGNORECASE) if jump_match: jump_to_use = jump begin_node = jump_match.groupdict()["begin_node"] self.move_to_node(begin_node) return self.__dict__[jump_to_use](self.data, **options)
def move_to(
self, path, **options)
Changes the state of the graph by moving along the path specified
def move_to(self, path, **options): """Changes the state of the graph by moving along the path specified""" defaults = {"debug": False, "verbose": False} move_options = {} for key, value in defaults.items(): move_options[key] = value for key, value in options.items(): move_options[key] = value if move_options["debug"]: print(path) for index, edge in enumerate(path): # print edge edge_pattern = 'edge_(?P<begin_node>\w+)_(?P<end_node>\w+)_(?P<iterator>\w+)' match = re.match(edge_pattern, edge) begin_node = match.groupdict()['begin_node'] end_node = match.groupdict()['end_node'] if move_options["verbose"]: print(("moving {0} -> {1}".format(begin_node, end_node))) # print self.data self.data = self.__dict__[edge](self.data) # print self.data self.current_node = match.groupdict()['end_node'] self.state = [0 for i in range(len(self.node_names))] position = self.node_names.index(self.current_node) self.state[position] = 1 self.state_matrix = np.matrix(self.state).T
def move_to_node(
self, node)
Inheritance:
Graph.move_to_node
Moves from current_node to the specified node
def move_to_node(self, node): """Moves from current_node to the specified node""" path = self.get_path(self.current_node, node) self.move_to(path)
def path_length(
self, path, num_repeats=10)
Inheritance:
Graph.path_length
Determines the length of a given path, currently the metric is based on the time to move to.
def path_length(self, path, num_repeats=10): """Determines the length of a given path, currently the metric is based on the time to move to.""" begin_time = datetime.datetime.now() # num_repeats=100 for i in range(num_repeats): self.virtual_move_to(path) end_time = datetime.datetime.now() delta_t = end_time - begin_time path_length = delta_t.total_seconds() / float(num_repeats) if path_length == 0.0: print("Warning the path length is less than 1 microsecond," "make sure num_repeats is high enough to measure it.") return path_length
def set_state(
self, node_name, node_data)
Sets the graph state to be the state specified by node_name, and node_data
def set_state(self, node_name, node_data): """Sets the graph state to be the state specified by node_name, and node_data""" try: current_node_state_position = self.node_names.index(node_name) self.current_node = node_name self.data = node_data self.state = [0 for i in range(len(self.node_names))] self.state[current_node_state_position] = 1 self.state_matrix = np.matrix(self.state).T except: print(("Could not set the state of graph: {0}".format(self.graph_name))) raise
def show(
self, **options)
Shows the graph using matplotlib and networkx
def show(self, **options): """Shows the graph using matplotlib and networkx""" # Should be seperated to allow for fixed presentation? defaults = {"descriptions": False, "edge_descriptions": False, "save_plot": False, "path": None, "active_node": True, "directory": None, "specific_descriptor": self.graph_name.replace(" ", "_"), "general_descriptor": "plot", "file_name": None, "arrows": True, "node_size": 1000, "font_size": 10, "fix_layout": True} show_options = {} for key, value in defaults.items(): show_options[key] = value for key, value in options.items(): show_options[key] = value if show_options["directory"] is None: show_options["directory"] = os.getcwd() if show_options["active_node"]: node_colors = [] for node in self.display_graph.nodes(): if node == self.current_node: node_colors.append('b') else: if node in self.node_names: node_colors.append('r') elif node in self.external_node_names: node_colors.append('g') else: node_colors = ['r' for node in self.node_names] + ['g' for node in self.node_names] # print("{0} is {1}".format('node_colors',node_colors)) if show_options["descriptions"]: node_labels = {node: self.node_descriptions[index] for index, node in enumerate(self.node_names)} if self.external_node_names: for index, node in enumerate(self.external_node_names): node_labels[node] = self.external_node_descriptions[index] networkx.draw_networkx(self.display_graph, arrows=show_options["arrows"], labels=node_labels, node_color=node_colors, node_size=show_options["node_size"], font_size=show_options["font_size"], pos=self.display_layout) # print("{0} is {1}".format('node_labels',node_labels)) else: networkx.draw_networkx(self.display_graph, arrows=show_options["arrows"], node_color=node_colors, node_size=show_options["node_size"], font_size=show_options["font_size"], pos=self.display_layout) plt.axis('off') plt.suptitle(self.options["graph_name"]) if show_options["file_name"] is None: file_name = auto_name(specific_descriptor=show_options["specific_descriptor"], general_descriptor=show_options["general_descriptor"], directory=show_options["directory"], extension='png', padding=3) else: file_name = show_options["file_name"] if show_options["save_plot"]: # print file_name if show_options["path"]: plt.savefig(show_options["path"]) else: plt.savefig(os.path.join(show_options["directory"], file_name)) else: plt.show() fig = plt.gcf() return fig
def virtual_move_to(
self, path)
Inheritance:
Graph.virtual_move_to
virtual_move_to simulates moving but does not change the state of the graph
def virtual_move_to(self, path): """virtual_move_to simulates moving but does not change the state of the graph""" # print path temp_state = self.state temp_data = self.data temp_current_node = self.current_node temp_node_names = self.node_names for index, edge in enumerate(path): # print edge edge_pattern = 'edge_(?P<begin_node>\w+)_(?P<end_node>\w+)_(?P<iterator>\w+)' match = re.match(edge_pattern, edge) begin_node = match.groupdict()['begin_node'] end_node = match.groupdict()['end_node'] # print("moving {0} -> {1}".format(begin_node,end_node)) # print self.data temp_data = self.__dict__[edge](temp_data) # print self.data temp_current_node = match.groupdict()['end_node'] temp_state = [0 for i in range(len(temp_node_names))] position = temp_node_names.index(temp_current_node) temp_state[position] = 1
class TwoPortParameterGraph
TwoPortParamterGraph is a content graph for two-port parameters, it transforms between S,T,Y,Z,ABCD and H parameters and matrix versions. #!python defaults={"graph_name":"Two Port Parameter Graph", "node_names":["SFrequencyList",'SFrequencyMatrixList'], "node_descriptions":["S Parameters","S Parameters in a Matrix"], "current_node":'SFrequencyList', "state":[1,0], "data":[[1.0,.9,.436,.436,.9]], "edge_2_to_1":FrequencyMatrixList_to_FrequencyList, "edge_1_to_2":FrequencyList_to_FrequencyMatrixList, "frequency_units":"GHz", "Z01":50, "Z02":50 }
class TwoPortParameterGraph(Graph): """TwoPortParamterGraph is a content graph for two-port parameters, it transforms between S,T,Y,Z,ABCD and H parameters and matrix versions. #!python defaults={"graph_name":"Two Port Parameter Graph", "node_names":["SFrequencyList",'SFrequencyMatrixList'], "node_descriptions":["S Parameters","S Parameters in a Matrix"], "current_node":'SFrequencyList', "state":[1,0], "data":[[1.0,.9,.436,.436,.9]], "edge_2_to_1":FrequencyMatrixList_to_FrequencyList, "edge_1_to_2":FrequencyList_to_FrequencyMatrixList, "frequency_units":"GHz", "Z01":50, "Z02":50 } """ def __init__(self,**options): defaults={"graph_name":"Two Port Parameter Graph", "node_names":["SFrequencyList",'SFrequencyMatrixList'], "node_descriptions":["S Parameters","S Parameters in a Matrix"], "current_node":'SFrequencyList', "state":[1,0], "data":[[1.0,.9,.436,.436,.9]], "edge_2_to_1":FrequencyMatrixList_to_FrequencyList, "edge_1_to_2":FrequencyList_to_FrequencyMatrixList, "frequency_units":"GHz", "Z01":50, "Z02":50 } graph_options={} for key,value in defaults.items(): graph_options[key]=value for key,value in options.items(): graph_options[key]=value Graph.__init__(self,**graph_options) self.add_node("TFrequencyMatrixList", "SFrequencyMatrixList",SFrequencyMatrixList_to_TFrequencyMatrixList, "SFrequencyMatrixList",TFrequencyMatrixList_to_SFrequencyMatrixList, "T Parameters in a Matrix") self.add_node("TFrequencyList", "TFrequencyMatrixList",FrequencyMatrixList_to_FrequencyList, "TFrequencyMatrixList",FrequencyList_to_FrequencyMatrixList, "T Parameters") self.add_node("ZFrequencyList", "SFrequencyList",SFrequencyList_to_ZFrequencyList, "TFrequencyList",ZFrequencyList_to_TFrequencyList, "Z Parameters") self.add_node("ZFrequencyMatrixList", "ZFrequencyList",FrequencyList_to_FrequencyMatrixList, "ZFrequencyList",FrequencyMatrixList_to_FrequencyList, "Z Parameters in a matrix") self.add_node("ABCDFrequencyList", "ZFrequencyList",ZFrequencyList_to_ABCDFrequencyList, "ZFrequencyList",ABCDFrequencyList_to_ZFrequencyList, "ABCD Parameters") self.add_node("ABCDFrequencyMatrixList", "ABCDFrequencyList",FrequencyList_to_FrequencyMatrixList, "ABCDFrequencyList",FrequencyMatrixList_to_FrequencyList, "ABCD Parameters in a matrix") self.add_node("HFrequencyList", "ABCDFrequencyList",ABCDFrequencyList_to_HFrequencyList, "ZFrequencyList",HFrequencyList_to_ZFrequencyList, "h Parameters") self.add_node("HFrequencyMatrixList", "HFrequencyList",FrequencyList_to_FrequencyMatrixList, "HFrequencyList",FrequencyMatrixList_to_FrequencyList, "H Parameters in a matrix") self.add_node("YFrequencyList", "ABCDFrequencyList",ABCDFrequencyList_to_YFrequencyList, "HFrequencyList",YFrequencyList_to_HFrequencyList, "Y Parameters") self.add_node("YFrequencyMatrixList", "YFrequencyList",FrequencyList_to_FrequencyMatrixList, "YFrequencyList",FrequencyMatrixList_to_FrequencyList, "Y Parameters in a matrix") self.add_edge(begin_node="ZFrequencyMatrixList", end_node="YFrequencyMatrixList", edge_function=ZFrequencyMatrixList_to_YFrequencyMatrixList) self.add_edge(begin_node="SFrequencyMatrixList", end_node="ZFrequencyMatrixList", edge_function=SFrequencyMatrixList_to_ZFrequencyMatrixList) self.add_edge(begin_node="ZFrequencyMatrixList", end_node="TFrequencyMatrixList", edge_function=ZFrequencyMatrixList_to_TFrequencyMatrixList) self.add_edge(begin_node="ABCDFrequencyList", end_node="SFrequencyList", edge_function=ABCDFrequencyList_to_SFrequencyList)
Ancestors (in MRO)
- TwoPortParameterGraph
- Graph
- __builtin__.object
Instance variables
Methods
def __init__(
self, **options)
Initializes the graph. The first 2 nodes and two edges forming a bijection between them are required
def __init__(self,**options): defaults={"graph_name":"Two Port Parameter Graph", "node_names":["SFrequencyList",'SFrequencyMatrixList'], "node_descriptions":["S Parameters","S Parameters in a Matrix"], "current_node":'SFrequencyList', "state":[1,0], "data":[[1.0,.9,.436,.436,.9]], "edge_2_to_1":FrequencyMatrixList_to_FrequencyList, "edge_1_to_2":FrequencyList_to_FrequencyMatrixList, "frequency_units":"GHz", "Z01":50, "Z02":50 } graph_options={} for key,value in defaults.items(): graph_options[key]=value for key,value in options.items(): graph_options[key]=value Graph.__init__(self,**graph_options) self.add_node("TFrequencyMatrixList", "SFrequencyMatrixList",SFrequencyMatrixList_to_TFrequencyMatrixList, "SFrequencyMatrixList",TFrequencyMatrixList_to_SFrequencyMatrixList, "T Parameters in a Matrix") self.add_node("TFrequencyList", "TFrequencyMatrixList",FrequencyMatrixList_to_FrequencyList, "TFrequencyMatrixList",FrequencyList_to_FrequencyMatrixList, "T Parameters") self.add_node("ZFrequencyList", "SFrequencyList",SFrequencyList_to_ZFrequencyList, "TFrequencyList",ZFrequencyList_to_TFrequencyList, "Z Parameters") self.add_node("ZFrequencyMatrixList", "ZFrequencyList",FrequencyList_to_FrequencyMatrixList, "ZFrequencyList",FrequencyMatrixList_to_FrequencyList, "Z Parameters in a matrix") self.add_node("ABCDFrequencyList", "ZFrequencyList",ZFrequencyList_to_ABCDFrequencyList, "ZFrequencyList",ABCDFrequencyList_to_ZFrequencyList, "ABCD Parameters") self.add_node("ABCDFrequencyMatrixList", "ABCDFrequencyList",FrequencyList_to_FrequencyMatrixList, "ABCDFrequencyList",FrequencyMatrixList_to_FrequencyList, "ABCD Parameters in a matrix") self.add_node("HFrequencyList", "ABCDFrequencyList",ABCDFrequencyList_to_HFrequencyList, "ZFrequencyList",HFrequencyList_to_ZFrequencyList, "h Parameters") self.add_node("HFrequencyMatrixList", "HFrequencyList",FrequencyList_to_FrequencyMatrixList, "HFrequencyList",FrequencyMatrixList_to_FrequencyList, "H Parameters in a matrix") self.add_node("YFrequencyList", "ABCDFrequencyList",ABCDFrequencyList_to_YFrequencyList, "HFrequencyList",YFrequencyList_to_HFrequencyList, "Y Parameters") self.add_node("YFrequencyMatrixList", "YFrequencyList",FrequencyList_to_FrequencyMatrixList, "YFrequencyList",FrequencyMatrixList_to_FrequencyList, "Y Parameters in a matrix") self.add_edge(begin_node="ZFrequencyMatrixList", end_node="YFrequencyMatrixList", edge_function=ZFrequencyMatrixList_to_YFrequencyMatrixList) self.add_edge(begin_node="SFrequencyMatrixList", end_node="ZFrequencyMatrixList", edge_function=SFrequencyMatrixList_to_ZFrequencyMatrixList) self.add_edge(begin_node="ZFrequencyMatrixList", end_node="TFrequencyMatrixList", edge_function=ZFrequencyMatrixList_to_TFrequencyMatrixList) self.add_edge(begin_node="ABCDFrequencyList", end_node="SFrequencyList", edge_function=ABCDFrequencyList_to_SFrequencyList)
def add_edge(
self, begin_node=None, end_node=None, edge_function=None)
Adds an edge mapping one node to another, required input is begin_node (it's name) end_node, and the edge function
def add_edge(self, begin_node=None, end_node=None, edge_function=None): """Adds an edge mapping one node to another, required input is begin_node (it's name) end_node, and the edge function""" # check to see if edge is defined if it is increment a number edge_match = re.compile("edge_{0}_{1}".format(begin_node, end_node)) keys = list(self.__dict__.keys()) # print keys iterator = 0 for key in keys: if re.match(edge_match, key): iterator += 1 edge_name = "edge_{0}_{1}_{2:0>3d}".format(begin_node, end_node, iterator) self.__dict__[edge_name] = edge_function self.edges.append(edge_name) edge_matrix = np.zeros((len(self.state), len(self.state))) begin_position = self.node_names.index(begin_node) end_position = self.node_names.index(end_node) edge_matrix[end_position][begin_position] = 1 edge_matrix = np.matrix(edge_matrix) self.edge_matrices.append(edge_matrix) self.display_graph.add_edge(begin_node, end_node) self.display_layout = networkx.spring_layout(self.display_graph)
def add_external_node(
self, external_node_name, jump_into_node_begin, jump_into_node_function, external_node_description=None)
Inheritance:
Graph.add_external_node
Adds an external node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new external node
def add_external_node(self, external_node_name, jump_into_node_begin, jump_into_node_function, external_node_description=None): """Adds an external node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new external node""" # first check if node into and out of node is good self.external_node_names.append(external_node_name) self.add_jump(begin_node=jump_into_node_begin, end_node=external_node_name, jump_function=jump_into_node_function) if external_node_description: self.external_node_descriptions.append(external_node_description) self.display_graph.add_node(external_node_name) self.display_graph.add_edge(jump_into_node_begin, external_node_name) self.display_layout = networkx.spring_layout(self.display_graph)
def add_jump(
self, begin_node=None, end_node=None, jump_function=None)
Adds a jump mapping one internal node to an external node, required input is begin_node (it's name) end_node, and the edge function
def add_jump(self, begin_node=None, end_node=None, jump_function=None): """Adds a jump mapping one internal node to an external node, required input is begin_node (it's name) end_node, and the edge function""" # check to see if edge is defined if it is increment a number jump_match = re.compile("jump_{0}_{1}".format(begin_node, end_node)) keys = list(self.__dict__.keys()) # print keys iterator = 0 for key in keys: if re.match(jump_match, key): iterator += 1 jump_name = "jump_{0}_{1}_{2:0>3d}".format(begin_node, end_node, iterator) self.__dict__[jump_name] = jump_function self.jumps.append(jump_name) self.display_graph.add_edge(begin_node, end_node) self.display_layout = networkx.spring_layout(self.display_graph)
def add_node(
self, node_name, edge_into_node_begin, edge_into_node_function, edge_out_node_end, edge_out_node_function, node_description=None)
Adds a node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new node, a reference to an exiting node and the function mapping the new node to the exiting node.
def add_node(self, node_name, edge_into_node_begin, edge_into_node_function, edge_out_node_end, edge_out_node_function, node_description=None): """Adds a node to the graph. Required input is node_name (a string with no spaces), a reference to an entering node,the function mapping the entering node to the new node, a reference to an exiting node and the function mapping the new node to the exiting node.""" # first check if node into and out of node is good self.node_names.append(node_name) self.state.append(0) self.state_matrix = np.matrix(self.state).T for index, matrix in enumerate(self.edge_matrices): pad_row = np.zeros((1, len(matrix))) new_matrix = np.concatenate((matrix, pad_row), axis=0) pad_column = np.zeros((1, len(self.node_names))) new_matrix = np.concatenate((new_matrix, pad_column.T), axis=1) # print("New matrix is :\n{0}".format(new_matrix)) self.edge_matrices[index] = new_matrix self.add_edge(begin_node=node_name, end_node=edge_out_node_end, edge_function=edge_out_node_function) self.add_edge(begin_node=edge_into_node_begin, end_node=node_name, edge_function=edge_into_node_function) if node_description: self.node_descriptions.append(node_description) self.display_graph.add_node(node_name) self.display_graph.add_edge(node_name, edge_out_node_end) self.display_graph.add_edge(edge_into_node_begin, node_name) self.display_layout = networkx.spring_layout(self.display_graph)
def check_closed_path(
self)
Inheritance:
Graph.check_closed_path
Checks that data is not changed for the first closed path found. Returns True if data==data after moving around the closed path, False otherwise. Starting point is current_node
def check_closed_path(self): """Checks that data is not changed for the first closed path found. Returns True if data==data after moving around the closed path, False otherwise. Starting point is current_node """ temp_data = self.data path = self.get_path(self.current_node, self.current_node) if self.is_path_valid(path): pass else: print("Path is not valid, graph definition is broken") raise out = temp_data == self.data out_list = [self.current_node, path, out] print(("The assertion that the data remains unchanged,\n" "for node {0} following path {1} is {2}".format(*out_list))) return out
def get_description_dictionary(
self)
Inheritance:
Graph.get_description_dictionary
returns a dictionary of the form {NodeName:Node Description for all of the current nodes
def get_description_dictionary(self): "returns a dictionary of the form {NodeName:Node Description for all of the current nodes" dictionary = {node_name: self.node_descriptions[index] for index, node_name in enumerate(self.node_names)} return dictionary
def get_entering_edges(
self, node)
Inheritance:
Graph.get_entering_edges
Returns all edges that enter the specificed node
def get_entering_edges(self, node): """Returns all edges that enter the specificed node""" enter_edge_pattern = re.compile('edge_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(node)) enter_edges = [] for index, edge in enumerate(self.edges): if re.match(enter_edge_pattern, edge): enter_edges.append(edge) return enter_edges
def get_entering_nodes(
self, node)
Inheritance:
Graph.get_entering_nodes
Returns all nodes that have an edge that enter the specificed node
def get_entering_nodes(self, node): """Returns all nodes that have an edge that enter the specificed node""" enter_edge_pattern = re.compile('edge_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(node)) enter_nodes = [] for index, edge in enumerate(self.edges): enter_match = re.match(enter_edge_pattern, edge) if enter_match: enter_node = enter_match.groupdict()['begin_node'] enter_nodes.append(enter_node) return enter_nodes
def get_exiting_edges(
self, node)
Inheritance:
Graph.get_exiting_edges
Returns all edges that exit the specificed node
def get_exiting_edges(self, node): """Returns all edges that exit the specificed node""" exit_edge_pattern = re.compile('edge_{0}_(?P<end_node>\w+)_(?P<iterator>\w+)'.format(node)) exit_edges = [] for index, edge in enumerate(self.edges): if re.match(exit_edge_pattern, edge): exit_edges.append(edge) return exit_edges
def get_exiting_nodes(
self, node)
Inheritance:
Graph.get_exiting_nodes
Returns all nodes that have an edge leaving the specificed node
def get_exiting_nodes(self, node): """Returns all nodes that have an edge leaving the specificed node""" exit_edge_pattern = re.compile('edge_{0}_(?P<end_node>\w+)_(?P<iterator>\w+)'.format(node)) exit_nodes = [] for index, edge in enumerate(self.edges): exit_match = re.match(exit_edge_pattern, edge) if exit_match: exit_node = exit_match.groupdict()['end_node'] exit_nodes.append(exit_node) return exit_nodes
def get_path(
self, first_node, last_node, **options)
Returns the first path found between first node and last node, uses a breadth first search algorithm
def get_path(self, first_node, last_node, **options): """Returns the first path found between first node and last node, uses a breadth first search algorithm""" defaults = {"debug": False, "method": "BreathFirst"} self.get_path_options = {} for key, value in defaults.items(): self.get_path_options[key] = value for key, value in options.items(): self.get_path_options[key] = value unvisited_nodes = self.node_names[:] unvisited_nodes.remove(first_node) visited_nodes = [first_node] node_history = [] edge_history = [] path_queue = [] possible_paths = [] queue = [] current_edge = [] queue.append(first_node) path = {first_node: []} while queue: # first remove the current_node = queue.pop(0) if path_queue != []: current_edge = path_queue.pop(0) edge_history.append(current_edge) node_history.append(current_node) if self.get_path_options["debug"]: print(("current_node is {0}".format(current_node))) print(("current_edge is {0}".format(current_edge))) # if this node is the destination exit returning the path if current_node == last_node: if self.get_path_options["debug"]: print(("Node path was found to be {0}".format(node_path))) print(("path was found to be {0}".format(edge_path))) print(("{0} is {1}".format("path", path))) return path[last_node][::-1] adjacent_nodes = self.get_exiting_nodes(current_node) adjacent_paths = self.get_exiting_edges(current_node) if self.get_path_options["debug"]: print(("{0} are {1}".format("adjacent_nodes", adjacent_nodes))) print(("{0} are {1}".format("adjacent_paths", adjacent_paths))) current_history = edge_history for node_index, node in enumerate(adjacent_nodes): if node not in visited_nodes: queue.append(node) path_queue.append(adjacent_paths[node_index]) visited_nodes.append(node) path[node] = [adjacent_paths[node_index]] + path[current_node] path[node] # possible_paths.append(current_path.append(node)) if self.get_path_options["debug"]: print(("{0} is {1}".format("path_queue", path_queue)))
def is_graph_isomorphic(
self)
Inheritance:
Graph.is_graph_isomorphic
Returns True if all nodes have closed paths that preserve the data, False otherwise
def is_graph_isomorphic(self): """Returns True if all nodes have closed paths that preserve the data, False otherwise""" out = True for node in self.node_names: self.move_to_node(node) if not self.check_closed_path: out = False return out
def is_path_valid(
self, path)
Inheritance:
Graph.is_path_valid
Returns True if the path is valid from the current node position or False otherwise
def is_path_valid(self, path): """Returns True if the path is valid from the current node position or False otherwise""" null_state = [0 for i in range(len(self.node_names))] null_state_matrix = np.matrix(null_state).T new_state = np.matrix(self.state).T for index, edge in enumerate(path): # print index # print edge edge_position = self.edges.index(edge) move_matrix = self.edge_matrices[edge_position] # print move_matrix new_state = move_matrix * new_state if new_state.any() == null_state_matrix.any(): # print new_state # print null_state_matrix return False return True
def jump_to_external_node(
self, external_node_name, **options)
Inheritance:
Graph.jump_to_external_node
Returns the result of the jump, the graph is left in the node that is the begining of the jump
def jump_to_external_node(self, external_node_name, **options): """Returns the result of the jump, the graph is left in the node that is the begining of the jump""" end_node = external_node_name jump_pattern = 'jump_(?P<begin_node>\w+)_{0}_(?P<iterator>\w+)'.format(end_node) for jump in self.jumps[:]: jump_match = re.match(jump_pattern, jump, re.IGNORECASE) if jump_match: jump_to_use = jump begin_node = jump_match.groupdict()["begin_node"] self.move_to_node(begin_node) return self.__dict__[jump_to_use](self.data, **options)
def move_to(
self, path, **options)
Changes the state of the graph by moving along the path specified
def move_to(self, path, **options): """Changes the state of the graph by moving along the path specified""" defaults = {"debug": False, "verbose": False} move_options = {} for key, value in defaults.items(): move_options[key] = value for key, value in options.items(): move_options[key] = value if move_options["debug"]: print(path) for index, edge in enumerate(path): # print edge edge_pattern = 'edge_(?P<begin_node>\w+)_(?P<end_node>\w+)_(?P<iterator>\w+)' match = re.match(edge_pattern, edge) begin_node = match.groupdict()['begin_node'] end_node = match.groupdict()['end_node'] if move_options["verbose"]: print(("moving {0} -> {1}".format(begin_node, end_node))) # print self.data self.data = self.__dict__[edge](self.data) # print self.data self.current_node = match.groupdict()['end_node'] self.state = [0 for i in range(len(self.node_names))] position = self.node_names.index(self.current_node) self.state[position] = 1 self.state_matrix = np.matrix(self.state).T
def move_to_node(
self, node)
Inheritance:
Graph.move_to_node
Moves from current_node to the specified node
def move_to_node(self, node): """Moves from current_node to the specified node""" path = self.get_path(self.current_node, node) self.move_to(path)
def path_length(
self, path, num_repeats=10)
Inheritance:
Graph.path_length
Determines the length of a given path, currently the metric is based on the time to move to.
def path_length(self, path, num_repeats=10): """Determines the length of a given path, currently the metric is based on the time to move to.""" begin_time = datetime.datetime.now() # num_repeats=100 for i in range(num_repeats): self.virtual_move_to(path) end_time = datetime.datetime.now() delta_t = end_time - begin_time path_length = delta_t.total_seconds() / float(num_repeats) if path_length == 0.0: print("Warning the path length is less than 1 microsecond," "make sure num_repeats is high enough to measure it.") return path_length
def set_state(
self, node_name, node_data)
Sets the graph state to be the state specified by node_name, and node_data
def set_state(self, node_name, node_data): """Sets the graph state to be the state specified by node_name, and node_data""" try: current_node_state_position = self.node_names.index(node_name) self.current_node = node_name self.data = node_data self.state = [0 for i in range(len(self.node_names))] self.state[current_node_state_position] = 1 self.state_matrix = np.matrix(self.state).T except: print(("Could not set the state of graph: {0}".format(self.graph_name))) raise
def show(
self, **options)
Shows the graph using matplotlib and networkx
def show(self, **options): """Shows the graph using matplotlib and networkx""" # Should be seperated to allow for fixed presentation? defaults = {"descriptions": False, "edge_descriptions": False, "save_plot": False, "path": None, "active_node": True, "directory": None, "specific_descriptor": self.graph_name.replace(" ", "_"), "general_descriptor": "plot", "file_name": None, "arrows": True, "node_size": 1000, "font_size": 10, "fix_layout": True} show_options = {} for key, value in defaults.items(): show_options[key] = value for key, value in options.items(): show_options[key] = value if show_options["directory"] is None: show_options["directory"] = os.getcwd() if show_options["active_node"]: node_colors = [] for node in self.display_graph.nodes(): if node == self.current_node: node_colors.append('b') else: if node in self.node_names: node_colors.append('r') elif node in self.external_node_names: node_colors.append('g') else: node_colors = ['r' for node in self.node_names] + ['g' for node in self.node_names] # print("{0} is {1}".format('node_colors',node_colors)) if show_options["descriptions"]: node_labels = {node: self.node_descriptions[index] for index, node in enumerate(self.node_names)} if self.external_node_names: for index, node in enumerate(self.external_node_names): node_labels[node] = self.external_node_descriptions[index] networkx.draw_networkx(self.display_graph, arrows=show_options["arrows"], labels=node_labels, node_color=node_colors, node_size=show_options["node_size"], font_size=show_options["font_size"], pos=self.display_layout) # print("{0} is {1}".format('node_labels',node_labels)) else: networkx.draw_networkx(self.display_graph, arrows=show_options["arrows"], node_color=node_colors, node_size=show_options["node_size"], font_size=show_options["font_size"], pos=self.display_layout) plt.axis('off') plt.suptitle(self.options["graph_name"]) if show_options["file_name"] is None: file_name = auto_name(specific_descriptor=show_options["specific_descriptor"], general_descriptor=show_options["general_descriptor"], directory=show_options["directory"], extension='png', padding=3) else: file_name = show_options["file_name"] if show_options["save_plot"]: # print file_name if show_options["path"]: plt.savefig(show_options["path"]) else: plt.savefig(os.path.join(show_options["directory"], file_name)) else: plt.show() fig = plt.gcf() return fig
def virtual_move_to(
self, path)
Inheritance:
Graph.virtual_move_to
virtual_move_to simulates moving but does not change the state of the graph
def virtual_move_to(self, path): """virtual_move_to simulates moving but does not change the state of the graph""" # print path temp_state = self.state temp_data = self.data temp_current_node = self.current_node temp_node_names = self.node_names for index, edge in enumerate(path): # print edge edge_pattern = 'edge_(?P<begin_node>\w+)_(?P<end_node>\w+)_(?P<iterator>\w+)' match = re.match(edge_pattern, edge) begin_node = match.groupdict()['begin_node'] end_node = match.groupdict()['end_node'] # print("moving {0} -> {1}".format(begin_node,end_node)) # print self.data temp_data = self.__dict__[edge](temp_data) # print self.data temp_current_node = match.groupdict()['end_node'] temp_state = [0 for i in range(len(temp_node_names))] position = temp_node_names.index(temp_current_node) temp_state[position] = 1
Module variables
var COMBINE_S11_S22
var COMMENT_PATTERN
var CONVERT_S21
var DEFAULT_FILE_NAME
var DEFAULT_INSTRUMENT_SHEET_STYLE
var DEFAULT_INSTRUMENT_STATE_STYLE
var DEFAULT_LOG_STYLE
var DEFAULT_MEASUREMENT_STYLE
var DEFAULT_METADATA_STYLE
var DEFAULT_REGISTER_STYLE
var DEFAULT_STYLE
var DRIVER_FILE_EXTENSIONS
var DUT_COLUMN_NAMES
var EXIF_AVAILABLE
var EXTENSION_PATTERN
var FORMATS
var FREQUENCY_UNITS
var GENERAL_DESCRIPTORS
var GET_STATS_FIELDS
var HTML_TEMPLATE_DIRECTORY
var IMAGE_FILE_EXTENSIONS
var INKSCAPE_PATH
var INSTRUMENT_SHEETS
var METHOD_ALIASES
var MINIMUM_DB_ARG_VALUE
var MINIMUM_DB_VALUE
var NODE_TYPE_DICTIONARY
var NUMBER_MATCH_STRING
var ONE_PORT_COLUMN_NAMES
var OPTION_LINE_PATTERN
var OS_STAT_FIELDS
var PARAMETERS
var PDF_CONVERT
var PIL_AVAILABLE
var POWER_3TERM_COLUMN_DESCRIPTIONS
var POWER_3TERM_COLUMN_NAMES
var POWER_4TERM_COLUMN_DESCRIPTIONS
var POWER_4TERM_COLUMN_NAMES
var PYMEASURE_ROOT
var RESULTS_FILE_ONE_PORT_COLUMN_NAMES
var RESULTS_FILE_POWER_COLUMN_NAMES
var RESULTS_FILE_TWO_PORT_COLUMN_NAMES
var S1P_DB_COLUMN_NAMES
var S1P_MA_COLUMN_NAMES
var S1P_RI_COLUMN_NAMES
var S2P_COMPLEX_COLUMN_NAMES
var S2P_DB_COLUMN_DESCRIPTION
var S2P_DB_COLUMN_NAMES
var S2P_MA_COLUMN_DESCRIPTION
var S2P_MA_COLUMN_NAMES
var S2P_NOISE_PARAMETER_COLUMN_NAMES
var S2P_RI_COLUMN_DESCRIPTION
var S2P_RI_COLUMN_NAMES
var SMITHPLOT
var SOLUTION_VECTOR_COLUMN_NAMES
var StringTypes
var TESTS_DIRECTORY
var TOUCHSTONE_KEYWORDS
var TWELVE_TERM_ERROR_COLUMN_NAMES
var WINDOWS_COM
var WINDOWS_WRAPPER
var WKHTML_PATH
var XSLT_CAPABLE
var XSLT_REPOSITORY
var type_names