diff options
author | jblum | 2009-06-23 20:38:18 +0000 |
---|---|---|
committer | jblum | 2009-06-23 20:38:18 +0000 |
commit | 9988664127b367fa8fee4409f8460673d6f265e1 (patch) | |
tree | 96752c15b7f1447e5e78a7282d1de141f9e0000b /grc/src/platforms/base/FlowGraph.py | |
parent | 885e6fe1fd0e06476511c79515f34ffcef50287d (diff) | |
download | gnuradio-9988664127b367fa8fee4409f8460673d6f265e1.tar.gz gnuradio-9988664127b367fa8fee4409f8460673d6f265e1.tar.bz2 gnuradio-9988664127b367fa8fee4409f8460673d6f265e1.zip |
Merging r11186:11273 from grc branch.
Fixes, features, and reorganization for grc.
Minor fixes and features for wxgui forms.
git-svn-id: http://gnuradio.org/svn/gnuradio/trunk@11274 221aa14e-8319-0410-a670-987f0aec2ac5
Diffstat (limited to 'grc/src/platforms/base/FlowGraph.py')
-rw-r--r-- | grc/src/platforms/base/FlowGraph.py | 224 |
1 files changed, 0 insertions, 224 deletions
diff --git a/grc/src/platforms/base/FlowGraph.py b/grc/src/platforms/base/FlowGraph.py deleted file mode 100644 index 6aeef2fa7..000000000 --- a/grc/src/platforms/base/FlowGraph.py +++ /dev/null @@ -1,224 +0,0 @@ -""" -Copyright 2008, 2009 Free Software Foundation, Inc. -This file is part of GNU Radio - -GNU Radio Companion is free software; you can redistribute it and/or -modify it under the terms of the GNU General Public License -as published by the Free Software Foundation; either version 2 -of the License, or (at your option) any later version. - -GNU Radio Companion is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA -""" - -from ... utils import odict -from Element import Element -from Block import Block -from Connection import Connection -from ... gui import Messages - -class FlowGraph(Element): - - def __init__(self, platform): - """ - Make a flow graph from the arguments. - @param platform a platforms with blocks and contrcutors - @return the flow graph object - """ - #initialize - Element.__init__(self, platform) - #inital blank import - self.import_data() - - def _get_unique_id(self, base_id=''): - """ - Get a unique id starting with the base id. - @param base_id the id starts with this and appends a count - @return a unique id - """ - index = 0 - while True: - id = '%s_%d'%(base_id, index) - index = index + 1 - #make sure that the id is not used by another block - if not filter(lambda b: b.get_id() == id, self.get_blocks()): return id - - def __str__(self): return 'FlowGraph - %s(%s)'%(self.get_option('title'), self.get_option('id')) - - def get_option(self, key): - """ - Get the option for a given key. - The option comes from the special options block. - @param key the param key for the options block - @return the value held by that param - """ - return self._options_block.get_param(key).evaluate() - - def is_flow_graph(self): return True - - ############################################## - ## Access Elements - ############################################## - def get_block(self, id): return filter(lambda b: b.get_id() == id, self.get_blocks())[0] - def get_blocks(self): return filter(lambda e: e.is_block(), self.get_elements()) - def get_connections(self): return filter(lambda e: e.is_connection(), self.get_elements()) - def get_elements(self): - """ - Get a list of all the elements. - Always ensure that the options block is in the list (only once). - @return the element list - """ - options_block_count = self._elements.count(self._options_block) - if not options_block_count: - self._elements.append(self._options_block) - for i in range(options_block_count-1): - self._elements.remove(self._options_block) - return self._elements - - def get_enabled_blocks(self): - """ - Get a list of all blocks that are enabled. - @return a list of blocks - """ - return filter(lambda b: b.get_enabled(), self.get_blocks()) - - def get_enabled_connections(self): - """ - Get a list of all connections that are enabled. - @return a list of connections - """ - return filter(lambda c: c.get_enabled(), self.get_connections()) - - def get_new_block(self, key): - """ - Get a new block of the specified key. - Add the block to the list of elements. - @param key the block key - @return the new block or None if not found - """ - self.flag() - if key not in self.get_parent().get_block_keys(): return None - block = self.get_parent().get_new_block(self, key) - self.get_elements().append(block) - return block - - def connect(self, porta, portb): - """ - Create a connection between porta and portb. - @param porta a port - @param portb another port - @throw Exception bad connection - @return the new connection - """ - self.flag() - connection = self.get_parent().Connection(self, porta, portb) - self.get_elements().append(connection) - return connection - - def remove_element(self, element): - """ - Remove the element from the list of elements. - If the element is a port, remove the whole block. - If the element is a block, remove its connections. - If the element is a connection, just remove the connection. - """ - self.flag() - if element not in self.get_elements(): return - #found a port, set to parent signal block - if element.is_port(): - element = element.get_parent() - #remove block, remove all involved connections - if element.is_block(): - for port in element.get_ports(): - map(self.remove_element, port.get_connections()) - self.get_elements().remove(element) - - def evaluate(self, expr): - """ - Evaluate the expression. - @param expr the string expression - @throw NotImplementedError - """ - raise NotImplementedError - - def validate(self): - """ - Validate the flow graph. - All connections and blocks must be valid. - """ - for c in self.get_elements(): - try: assert c.is_valid() - except AssertionError: self._add_error_message('Element "%s" is not valid.'%c) - - ############################################## - ## Import/Export Methods - ############################################## - def export_data(self): - """ - Export this flow graph to nested data. - Export all block and connection data. - @return a nested data odict - """ - import time - n = odict() - n['timestamp'] = time.ctime() - n['block'] = [block.export_data() for block in self.get_blocks()] - n['connection'] = [connection.export_data() for connection in self.get_connections()] - return odict({'flow_graph': n}) - - def import_data(self, n=None): - """ - Import blocks and connections into this flow graph. - Clear this flowgraph of all previous blocks and connections. - Any blocks or connections in error will be ignored. - @param n the nested data odict - """ - #remove previous elements - self._elements = list() - #use blank data if none provided - fg_n = n and n.find('flow_graph') or odict() - blocks_n = fg_n.findall('block') - connections_n = fg_n.findall('connection') - #create option block - self._options_block = self.get_parent().get_new_block(self, 'options') - #build the blocks - for block_n in blocks_n: - key = block_n.find('key') - if key == 'options': block = self._options_block - else: block = self.get_new_block(key) - #only load the block when the block key was valid - if block: block.import_data(block_n) - else: Messages.send_error_load('Block key "%s" not found in %s'%(key, self.get_parent())) - #build the connections - for connection_n in connections_n: - #try to make the connection - try: - #get the block ids - source_block_id = connection_n.find('source_block_id') - sink_block_id = connection_n.find('sink_block_id') - #get the port keys - source_key = connection_n.find('source_key') - sink_key = connection_n.find('sink_key') - #verify the blocks - block_ids = map(lambda b: b.get_id(), self.get_blocks()) - assert(source_block_id in block_ids) - assert(sink_block_id in block_ids) - #get the blocks - source_block = self.get_block(source_block_id) - sink_block = self.get_block(sink_block_id) - #verify the ports - assert(source_key in source_block.get_source_keys()) - assert(sink_key in sink_block.get_sink_keys()) - #get the ports - source = source_block.get_source(source_key) - sink = sink_block.get_sink(sink_key) - #build the connection - self.connect(source, sink) - except AssertionError: Messages.send_error_load('Connection between %s(%s) and %s(%s) could not be made.'%(source_block_id, source_key, sink_block_id, sink_key)) - self.validate() |