diff options
-rw-r--r-- | .gitmodules | 9 | ||||
m--------- | Apology | 0 | ||||
-rw-r--r-- | CMakeLists.txt | 61 | ||||
m--------- | PMC | 0 | ||||
m--------- | gnuradio | 0 | ||||
m--------- | gr36 | 0 | ||||
m--------- | gr37 | 0 | ||||
m--------- | grex | 0 | ||||
-rw-r--r-- | include/gras/block.hpp | 13 | ||||
-rw-r--r-- | include/gras/block.i | 1 | ||||
-rw-r--r-- | include/gras/block_config.hpp | 16 | ||||
-rw-r--r-- | include/gras/element.hpp | 18 | ||||
-rw-r--r-- | include/gras/element.i | 2 | ||||
-rw-r--r-- | include/gras/hier_block.hpp | 7 | ||||
-rw-r--r-- | include/gras/hier_block.i | 1 | ||||
-rw-r--r-- | include/gras/top_block.hpp | 12 | ||||
-rw-r--r-- | include/gras/top_block.i | 1 | ||||
-rw-r--r-- | lib/block.cpp | 31 | ||||
-rw-r--r-- | lib/block_allocator.cpp | 10 | ||||
-rw-r--r-- | lib/block_config.cpp | 27 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 20 | ||||
-rw-r--r-- | lib/element.cpp | 15 | ||||
-rw-r--r-- | lib/element_impl.hpp | 6 | ||||
-rw-r--r-- | lib/gras_impl/block_data.hpp | 1 | ||||
-rw-r--r-- | lib/hier_block.cpp | 11 | ||||
-rw-r--r-- | lib/task_main.cpp | 2 | ||||
-rw-r--r-- | lib/top_block.cpp | 14 | ||||
-rw-r--r-- | lib/top_block_query.cpp | 12 | ||||
-rw-r--r-- | python/gras/GRAS_PyHierBlocks.i | 8 | ||||
-rw-r--r-- | tests/thread_pool_test.py | 3 |
30 files changed, 187 insertions, 114 deletions
diff --git a/.gitmodules b/.gitmodules index ae5e210..658111c 100644 --- a/.gitmodules +++ b/.gitmodules @@ -7,9 +7,12 @@ [submodule "PMC"] path = PMC url = http://github.com/guruofquality/PMC.git -[submodule "gnuradio"] - path = gnuradio - url = http://github.com/guruofquality/gnuradio.git [submodule "grex"] path = grex url = http://github.com/guruofquality/grex.git +[submodule "gr36"] + path = gr36 + url = http://github.com/guruofquality/gnuradio.git +[submodule "gr37"] + path = gr37 + url = http://github.com/guruofquality/gnuradio.git diff --git a/Apology b/Apology -Subproject ab6fc7615056dd5b6c737e1f4005250fe028103 +Subproject 4b592f3d0c60b43f9dca3f9d7c3b501b7c40d3d diff --git a/CMakeLists.txt b/CMakeLists.txt index 7d7aa0b..dad3ac4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,7 +23,8 @@ endfunction(GRAS_CHECK_SUBMODULE) GRAS_CHECK_SUBMODULE(Theron) GRAS_CHECK_SUBMODULE(Apology) GRAS_CHECK_SUBMODULE(PMC) -GRAS_CHECK_SUBMODULE(gnuradio) +GRAS_CHECK_SUBMODULE(gr36) +GRAS_CHECK_SUBMODULE(gr37) GRAS_CHECK_SUBMODULE(grex) list(APPEND CMAKE_MODULE_PATH ${GRAS_SOURCE_DIR}/PMC/cmake/Modules) @@ -107,8 +108,17 @@ add_subdirectory(cmake/Modules) ######################################################################## # add gnuradio as sub-project ######################################################################## -set(CMAKE_SOURCE_DIR ${GRAS_SOURCE_DIR}/gnuradio) -set(CMAKE_BINARY_DIR ${GRAS_BINARY_DIR}/gnuradio) +if(NOT GRDIR) + set(GRDIR gr36) +endif() +set(CMAKE_SOURCE_DIR ${GRAS_SOURCE_DIR}/${GRDIR}) +set(CMAKE_BINARY_DIR ${GRAS_BINARY_DIR}/${GRDIR}) + +set(GRAS_FOUND TRUE) +#GRAS_INCLUDE_DIRS, GRAS_LIBRARIES set above +set(PMC_FOUND TRUE) +set(PMC_INCLUDE_DIRS ${GRAS_SOURCE_DIR}/PMC/include) +set(PMC_LIBRARIES pmc) #Theron isnt affected by boost thread issues. #So we allow the gr black listed versions. @@ -118,7 +128,7 @@ set(CMAKE_PROJECT_NAME gnuradio) #for submodule vars set(GR_MOAR_LIBRARIES ${GRAS_LIBRARIES}) -add_subdirectory(gnuradio) +add_subdirectory(${GRDIR}) ######################################################################## # add GRAS to gnuradio cpack registry @@ -158,52 +168,39 @@ list(APPEND GRAS_TEST_ENVIRONS "GRAS_ROOT=${GRAS_SOURCE_DIR}") set(CMAKE_SOURCE_DIR ${GRAS_SOURCE_DIR}/grex) set(CMAKE_BINARY_DIR ${GRAS_BINARY_DIR}/grex) -set(GRAS_FOUND TRUE) -#GRAS_INCLUDE_DIRS, GRAS_LIBRARIES set above - -set(PMC_FOUND TRUE) -set(PMC_INCLUDE_DIRS ${GRAS_SOURCE_DIR}/PMC/include) -set(PMC_LIBRARIES pmc) - #call include to force local include precedence include_directories(${GRAS_INCLUDE_DIRS}) include_directories(${PMC_INCLUDE_DIRS}) set(VOLK_FOUND ${ENABLE_VOLK}) set(VOLK_INCLUDE_DIRS - ${GRAS_SOURCE_DIR}/gnuradio/volk/include - ${GRAS_BINARY_DIR}/gnuradio/volk/include + ${GRAS_SOURCE_DIR}/${GRDIR}/volk/include + ${GRAS_BINARY_DIR}/${GRDIR}/volk/include ) if(MSVC) #add compatibility includes for stdint types - list(APPEND VOLK_INCLUDE_DIRS ${GRAS_SOURCE_DIR}/gnuradio/volk/cmake/msvc) + list(APPEND VOLK_INCLUDE_DIRS ${GRAS_SOURCE_DIR}/${GRDIR}/volk/cmake/msvc) endif() set(VOLK_LIBRARIES volk) if(ENABLE_VOLK) list(APPEND GR_TEST_TARGET_DEPS volk) endif(ENABLE_VOLK) -set(GNURADIO_CORE_FOUND ${ENABLE_GR_CORE}) -#GNURADIO_CORE_INCLUDE_DIRS set global by gnuradio -set(GNURADIO_CORE_LIBRARIES gnuradio-core) -if(ENABLE_GR_CORE) - list(APPEND GR_TEST_PYTHON_DIRS - ${GRAS_SOURCE_DIR}/gnuradio/gruel/src/python - ${GRAS_BINARY_DIR}/gnuradio/gruel/src/swig - ${GRAS_SOURCE_DIR}/gnuradio/gnuradio-core/src/python - ${GRAS_BINARY_DIR}/gnuradio/gnuradio-core/src/lib/swig - ) -endif(ENABLE_GR_CORE) - #packet stuffs uses gr-digtal: if(ENABLE_GR_DIGITAL) list(APPEND GR_TEST_PYTHON_DIRS - ${GRAS_BINARY_DIR}/gnuradio/gr-digital/python - ${GRAS_BINARY_DIR}/gnuradio/gr-digital/swig - ${GRAS_BINARY_DIR}/gnuradio/gr-filter/python - ${GRAS_BINARY_DIR}/gnuradio/gr-filter/swig - ${GRAS_BINARY_DIR}/gnuradio/gr-analog/python - ${GRAS_BINARY_DIR}/gnuradio/gr-analog/swig + ${GRAS_BINARY_DIR}/${GRDIR}/gnuradio-runtime/python #3.7 + ${GRAS_BINARY_DIR}/${GRDIR}/gr-digital/python/digital #3.7 + ${GRAS_SOURCE_DIR}/${GRDIR}/gruel/src/python #3.6 + ${GRAS_BINARY_DIR}/${GRDIR}/gruel/src/swig #3.6 + ${GRAS_SOURCE_DIR}/${GRDIR}/gnuradio-core/src/python #3.6 + ${GRAS_BINARY_DIR}/${GRDIR}/gnuradio-core/src/lib/swig #3.6 + ${GRAS_BINARY_DIR}/${GRDIR}/gr-digital/python #3.6 + ${GRAS_BINARY_DIR}/${GRDIR}/gr-digital/swig #3.6 + ${GRAS_BINARY_DIR}/${GRDIR}/gr-filter/python #3.6 + ${GRAS_BINARY_DIR}/${GRDIR}/gr-filter/swig #3.6 + ${GRAS_BINARY_DIR}/${GRDIR}/gr-analog/python #3.6 + ${GRAS_BINARY_DIR}/${GRDIR}/gr-analog/swig #3.6 ) list(APPEND GR_TEST_TARGET_DEPS gnuradio-digital gnuradio-filter gnuradio-fft gnuradio-analog) endif(ENABLE_GR_DIGITAL) diff --git a/PMC b/PMC -Subproject a08e9bc5a4d5290a179b42eabf913302119830a +Subproject a7c4df2b9d3707bc3e383164e7a6855c1ca9beb diff --git a/gnuradio b/gnuradio deleted file mode 160000 -Subproject 6961a794356d413f81bfbf347ed84619b1125d2 diff --git a/gr36 b/gr36 new file mode 160000 +Subproject e25fe808cf6a20e9469d1380d497e23205a379b diff --git a/gr37 b/gr37 new file mode 160000 +Subproject dbd9095bf783f9c883361f651a5252d98bca65d diff --git a/grex b/grex -Subproject 244b64b32834f31b19eb9bf2848eb9147919934 +Subproject 652cf7c19b01f37b294ad64eb0b483dc59d6ab7 diff --git a/include/gras/block.hpp b/include/gras/block.hpp index 05809a1..1adc4c3 100644 --- a/include/gras/block.hpp +++ b/include/gras/block.hpp @@ -31,19 +31,6 @@ struct GRAS_API Block : Element * Deal with block configuration configuration ******************************************************************/ - /*! - * Set the thread pool of this block. - * Every block is created in the default active thread pool. - * This call will migrate the block to a new specified pool. - */ - void set_thread_pool(const ThreadPool &thread_pool); - - //! Get the global block config settings - const GlobalBlockConfig &global_config(void) const; - - //! Get the global block config settings - GlobalBlockConfig &global_config(void); - //! Get the configuration rules of an input port const InputPortConfig &input_config(const size_t which_input) const; diff --git a/include/gras/block.i b/include/gras/block.i index 8daa55a..1804c34 100644 --- a/include/gras/block.i +++ b/include/gras/block.i @@ -14,7 +14,6 @@ %include <gras/tag_iter.i> %import <gras/sbuffer.i> %include <gras/buffer_queue.hpp> -%include <gras/thread_pool.hpp> %include <gras/block_config.hpp> %include <gras/block.hpp> diff --git a/include/gras/block_config.hpp b/include/gras/block_config.hpp index 5e5c0cb..17ff182 100644 --- a/include/gras/block_config.hpp +++ b/include/gras/block_config.hpp @@ -4,6 +4,7 @@ #define INCLUDED_GRAS_BLOCK_CONFIG_HPP #include <gras/gras.hpp> +#include <gras/thread_pool.hpp> #include <cstddef> namespace gras @@ -15,6 +16,13 @@ struct GRAS_API GlobalBlockConfig GlobalBlockConfig(void); /*! + * Merge the settings from another config. + * Non-defaults on this config stay, + * defaults will be overwritten. + */ + void merge(const GlobalBlockConfig &config); + + /*! * Constrain the maximum number of items that * work can be called with for all output ports. * @@ -46,6 +54,14 @@ struct GRAS_API GlobalBlockConfig * Default = false. */ bool interruptible_work; + + /*! + * This member sets the thread pool for the block. + * The block's actor will migrate to the new pool. + * + * Default = null thread pool. + */ + ThreadPool thread_pool; }; //! Configuration parameters for an input port diff --git a/include/gras/element.hpp b/include/gras/element.hpp index df47bef..84f7062 100644 --- a/include/gras/element.hpp +++ b/include/gras/element.hpp @@ -10,6 +10,7 @@ #include <gras/gras.hpp> #include <gras/callable.hpp> +#include <gras/block_config.hpp> #include <gras/weak_container.hpp> #include <boost/shared_ptr.hpp> @@ -66,6 +67,23 @@ struct GRAS_API Element : Callable, boost::shared_ptr<ElementImpl> std::string to_string(void) const; /******************************************************************* + * config interface + ******************************************************************/ + + //! Get the global block config settings + const GlobalBlockConfig &global_config(void) const; + + //! Get the global block config settings + GlobalBlockConfig &global_config(void); + + /*! + * Commit changes to the global configuration. + * Call this after modifying the global config. + * Must be call to apply changes to the global config. + */ + virtual void commit_config(void); + + /******************************************************************* * identification interface ******************************************************************/ diff --git a/include/gras/element.i b/include/gras/element.i index d462370..e577e9d 100644 --- a/include/gras/element.i +++ b/include/gras/element.i @@ -27,6 +27,8 @@ namespace gras %include <std_string.i> %import <PMC/PMC.i> %include <gras/gras.hpp> +%include <gras/thread_pool.hpp> +%include <gras/block_config.hpp> %include <gras/callable.hpp> %include <gras/element.hpp> diff --git a/include/gras/hier_block.hpp b/include/gras/hier_block.hpp index 73eeabd..b2da9be 100644 --- a/include/gras/hier_block.hpp +++ b/include/gras/hier_block.hpp @@ -16,6 +16,13 @@ struct GRAS_API HierBlock : Element virtual ~HierBlock(void); + /*! + * Commit changes to the global configuration. + * Call this after modifying the global config. + * Must be call to apply changes to the global config. + */ + void commit_config(void); + /******************************************************************* * connection flow interface ******************************************************************/ diff --git a/include/gras/hier_block.i b/include/gras/hier_block.i index f89252e..4b617c2 100644 --- a/include/gras/hier_block.i +++ b/include/gras/hier_block.i @@ -9,6 +9,7 @@ #include <gras/hier_block.hpp> %} +%include <gras/gras.hpp> %import <gras/element.i> %include <gras/hier_block.hpp> diff --git a/include/gras/top_block.hpp b/include/gras/top_block.hpp index 415c03e..3a1b9e5 100644 --- a/include/gras/top_block.hpp +++ b/include/gras/top_block.hpp @@ -3,7 +3,6 @@ #ifndef INCLUDED_GRAS_TOP_BLOCK_HPP #define INCLUDED_GRAS_TOP_BLOCK_HPP -#include <gras/block_config.hpp> #include <gras/hier_block.hpp> namespace gras @@ -17,11 +16,12 @@ struct GRAS_API TopBlock : HierBlock virtual ~TopBlock(void); - //! Get the global block config settings - const GlobalBlockConfig &global_config(void) const; - - //! Get the global block config settings - GlobalBlockConfig &global_config(void); + /*! + * Commit changes to the global configuration. + * Call this after modifying the global config. + * Commit config is called automatically by start/stop/run. + */ + void commit_config(void); /*! * Commit changes to the overall flow graph. diff --git a/include/gras/top_block.i b/include/gras/top_block.i index 23a4275..07d2482 100644 --- a/include/gras/top_block.i +++ b/include/gras/top_block.i @@ -10,7 +10,6 @@ %} %include <gras/gras.hpp> -%include <gras/block_config.hpp> %import <gras/element.i> %import <gras/hier_block.i> %include <gras/top_block.hpp> diff --git a/lib/block.cpp b/lib/block.cpp index acb05d3..aa130d7 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -121,16 +121,6 @@ typename V::value_type &vector_get_resize(V &v, const size_t index) return v[index]; } -const GlobalBlockConfig &Block::global_config(void) const -{ - return (*this)->block_data->global_config; -} - -GlobalBlockConfig &Block::global_config(void) -{ - return (*this)->block_data->global_config; -} - InputPortConfig &Block::input_config(const size_t which_input) { return vector_get_resize((*this)->block_data->input_configs, which_input); @@ -154,6 +144,18 @@ const OutputPortConfig &Block::output_config(const size_t which_output) const void Block::commit_config(void) { Theron::Actor &actor = *((*this)->block_actor); + + //handle thread pool migration + const ThreadPool &thread_pool = this->global_config().thread_pool; + if (thread_pool and thread_pool != (*this)->block_actor->thread_pool) + { + boost::shared_ptr<BlockActor> old_actor = (*this)->block_actor; + (*this)->block_actor.reset(BlockActor::make(thread_pool)); + (*this)->setup_actor(); + wait_actor_idle((*this)->repr, *old_actor); + } + + //update messages for in and out ports for (size_t i = 0; i < (*this)->worker->get_num_inputs(); i++) { InputUpdateMessage message; @@ -166,7 +168,6 @@ void Block::commit_config(void) message.index = i; actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress()); } - } void Block::notify_active(void) @@ -183,11 +184,3 @@ void Block::notify_topology(const size_t, const size_t) { return; } - -void Block::set_thread_pool(const ThreadPool &thread_pool) -{ - boost::shared_ptr<BlockActor> old_actor = (*this)->block_actor; - (*this)->block_actor.reset(BlockActor::make(thread_pool)); - (*this)->setup_actor(); - wait_actor_idle((*this)->repr, *old_actor); -} diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp index 8246037..6f59ef6 100644 --- a/lib/block_allocator.cpp +++ b/lib/block_allocator.cpp @@ -61,11 +61,15 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address const size_t num_outputs = worker->get_num_outputs(); for (size_t i = 0; i < num_outputs; i++) { + size_t reserve_items = data->output_configs[i].reserve_items; + size_t maximum_items = data->output_configs[i].maximum_items; + if (maximum_items == 0) maximum_items = data->block->global_config().maximum_output_items; + const size_t bytes = recommend_length( data->output_allocation_hints[i], my_round_up_mult(AT_LEAST_BYTES, data->output_configs[i].item_size), - data->output_configs[i].reserve_items*data->output_configs[i].item_size, - data->output_configs[i].maximum_items*data->output_configs[i].item_size + reserve_items*data->output_configs[i].item_size, + maximum_items*data->output_configs[i].item_size ); SBufferDeleter deleter = boost::bind(&buffer_returner, this->thread_pool, this->GetAddress(), i, _1); @@ -74,7 +78,7 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address SBufferConfig config; config.memory = NULL; config.length = bytes; - config.affinity = data->global_config.buffer_affinity; + config.affinity = data->block->global_config().buffer_affinity; config.token = token; BufferQueueSptr queue = data->block->output_buffer_allocator(i, config); diff --git a/lib/block_config.cpp b/lib/block_config.cpp index 165ab47..227ae08 100644 --- a/lib/block_config.cpp +++ b/lib/block_config.cpp @@ -11,6 +11,33 @@ GlobalBlockConfig::GlobalBlockConfig(void) interruptible_work = false; } +void GlobalBlockConfig::merge(const GlobalBlockConfig &config) +{ + //overwrite with config's max items only if maxium_items is not set (zero) + if (this->maximum_output_items == 0) + { + this->maximum_output_items = config.maximum_output_items; + } + + //overwrite with config's node affinity setting for buffers if not set + if (this->buffer_affinity == -1) + { + this->buffer_affinity = config.buffer_affinity; + } + + //overwrite with config's interruptable setting for work if not set + if (this->interruptible_work == false) + { + this->interruptible_work = config.interruptible_work; + } + + //overwrite with config's thread pool for actor if not set + if (not this->thread_pool) + { + this->thread_pool = config.thread_pool; + } +} + InputPortConfig::InputPortConfig(void) { item_size = 1; diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index ca1ad97..57e24c1 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -83,29 +83,19 @@ void BlockActor::handle_top_config( const Theron::Address from ){ MESSAGE_TRACER(); - const GlobalBlockConfig &config = message.config; + + //merge in the non-defaults + data->block->global_config().merge(message.config); //overwrite with global config only if maxium_items is not set (zero) for (size_t i = 0; i < data->output_configs.size(); i++) { if (data->output_configs[i].maximum_items == 0) { - data->output_configs[i].maximum_items = config.maximum_output_items; + data->output_configs[i].maximum_items = data->block->global_config().maximum_output_items; } } - //overwrite with global node affinity setting for buffers if not set - if (data->global_config.buffer_affinity == -1) - { - data->global_config.buffer_affinity = config.buffer_affinity; - } - - //overwrite with global interruptable setting for work if not set - if (data->global_config.interruptible_work == false) - { - data->global_config.interruptible_work = config.interruptible_work; - } - this->Send(0, from); //ACK } @@ -120,7 +110,7 @@ void BlockActor::handle_top_thread_group( //spawn a new thread if this block is a source data->thread_group = message.thread_group; data->interruptible_thread.reset(); //erase old one - if (data->global_config.interruptible_work) + if (data->block->global_config().interruptible_work) { data->interruptible_thread = boost::make_shared<InterruptibleThread>( data->thread_group, boost::bind(&BlockActor::task_work, this) diff --git a/lib/element.cpp b/lib/element.cpp index 9f3c5fb..efd8dfd 100644 --- a/lib/element.cpp +++ b/lib/element.cpp @@ -84,6 +84,21 @@ std::string Element::to_string(void) const return (*this)->repr; } +const GlobalBlockConfig &Element::global_config(void) const +{ + return (*this)->global_config; +} + +GlobalBlockConfig &Element::global_config(void) +{ + return (*this)->global_config; +} + +void Element::commit_config(void) +{ + //NOP -- this call gets overridden +} + void Element::adopt_element(const std::string &name, const Element &child) { if (child->parent) throw std::invalid_argument(str(boost::format( diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index 91497ed..dca7963 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -38,7 +38,7 @@ struct ElementImpl //top block stuff SharedThreadGroup thread_group; Token token; - GlobalBlockConfig top_config; + GlobalBlockConfig global_config; //element tree stuff Element parent; @@ -63,14 +63,14 @@ struct ElementImpl void bcast_prio_msg(const MessageType &msg) { Theron::Receiver receiver; - BOOST_FOREACH(Apology::Worker *w, this->executor->get_workers()) + BOOST_FOREACH(Apology::Worker *w, this->topology->get_workers()) { BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor()); MessageType message = msg; message.prio_token = actor->prio_token; actor->GetFramework().Send(message, receiver.GetAddress(), actor->GetAddress()); } - size_t outstandingCount(this->executor->get_workers().size()); + size_t outstandingCount(this->topology->get_workers().size()); while (outstandingCount != 0) { outstandingCount -= receiver.Wait(outstandingCount); diff --git a/lib/gras_impl/block_data.hpp b/lib/gras_impl/block_data.hpp index d6af53d..f4dd9bd 100644 --- a/lib/gras_impl/block_data.hpp +++ b/lib/gras_impl/block_data.hpp @@ -69,7 +69,6 @@ struct BlockData //is the fg running? BlockState block_state; - GlobalBlockConfig global_config; std::vector<std::vector<OutputHintMessage> > output_allocation_hints; diff --git a/lib/hier_block.cpp b/lib/hier_block.cpp index abbdec1..5ca74a9 100644 --- a/lib/hier_block.cpp +++ b/lib/hier_block.cpp @@ -3,6 +3,7 @@ #include "element_impl.hpp" #include <gras/hier_block.hpp> #include <boost/format.hpp> +#include <boost/foreach.hpp> #include <exception> using namespace gras; @@ -23,6 +24,16 @@ HierBlock::~HierBlock(void) //NOP } +void HierBlock::commit_config(void) +{ + BOOST_FOREACH(Apology::Worker *w, (*this)->topology->get_workers()) + { + BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor()); + actor->data->block->global_config().merge((*this)->global_config); + actor->data->block->commit_config(); + } +} + void ElementImpl::hier_block_cleanup(void) { this->topology->clear_all(); diff --git a/lib/task_main.cpp b/lib/task_main.cpp index 5547b08..803a5e3 100644 --- a/lib/task_main.cpp +++ b/lib/task_main.cpp @@ -106,7 +106,7 @@ void BlockActor::task_main(void) buff.unique() and data->input_configs[i].inline_buffer and output_inline_index < num_outputs and - buff.get_affinity() == data->global_config.buffer_affinity + buff.get_affinity() == data->block->global_config().buffer_affinity ){ data->output_queues.set_inline(output_inline_index++, buff); } diff --git a/lib/top_block.cpp b/lib/top_block.cpp index 0c8572d..4892e77 100644 --- a/lib/top_block.cpp +++ b/lib/top_block.cpp @@ -31,18 +31,14 @@ void ElementImpl::top_block_cleanup(void) this->executor->commit(); } -const GlobalBlockConfig &TopBlock::global_config(void) const +void TopBlock::commit_config(void) { - return (*this)->top_config; -} - -GlobalBlockConfig &TopBlock::global_config(void) -{ - return (*this)->top_config; + HierBlock::commit_config(); } void TopBlock::commit(void) { + this->commit_config(); this->start(); //ok to re-start, means update } @@ -62,7 +58,7 @@ void TopBlock::start(void) { //send the global block config before alloc TopConfigMessage message; - message.config = (*this)->top_config; + message.config = (*this)->global_config; (*this)->bcast_prio_msg(message); } { @@ -127,7 +123,7 @@ void TopBlock::wait(void) } //loop through blocks looking for non-done blocks with done inputs - BOOST_FOREACH(Apology::Worker *w, (*this)->executor->get_workers()) + BOOST_FOREACH(Apology::Worker *w, (*this)->topology->get_workers()) { BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor()); if (actor->data->block_state == BLOCK_STATE_DONE) has_a_done = true; diff --git a/lib/top_block_query.cpp b/lib/top_block_query.cpp index ccc5c29..68a56ef 100644 --- a/lib/top_block_query.cpp +++ b/lib/top_block_query.cpp @@ -31,7 +31,7 @@ static ptree query_blocks(ElementImpl *self, const ptree &) { ptree root; ptree e; - BOOST_FOREACH(Apology::Worker *w, self->executor->get_workers()) + BOOST_FOREACH(Apology::Worker *w, self->topology->get_workers()) { BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor()); ptree prop_e; @@ -61,7 +61,7 @@ static ptree query_stats(ElementImpl *self, const ptree &query) //get stats with custom receiver and set high prio GetStatsReceiver receiver; size_t outstandingCount(0); - BOOST_FOREACH(Apology::Worker *w, self->executor->get_workers()) + BOOST_FOREACH(Apology::Worker *w, self->topology->get_workers()) { BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor()); @@ -93,7 +93,7 @@ static ptree query_stats(ElementImpl *self, const ptree &query) //thread pool counts std::set<ThreadPool> thread_pools; - BOOST_FOREACH(Apology::Worker *w, self->executor->get_workers()) + BOOST_FOREACH(Apology::Worker *w, self->topology->get_workers()) { BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor()); thread_pools.insert(actor->thread_pool); @@ -161,7 +161,7 @@ static ptree query_calls(ElementImpl *self, const ptree &query) ptree root; const std::string block_id = query.get<std::string>("block"); const std::string call_name = query.get<std::string>("name"); - BOOST_FOREACH(Apology::Worker *w, self->executor->get_workers()) + BOOST_FOREACH(Apology::Worker *w, self->topology->get_workers()) { BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor()); if (actor->data->block->get_uid() != block_id) continue; @@ -189,7 +189,7 @@ static std::string query_topology(ElementImpl *self, const ptree &query) buff += "rankdir=LR;\n"; buff += "node [shape=record, fontsize=10];\n"; - BOOST_FOREACH(Apology::Worker *w, self->executor->get_workers()) + BOOST_FOREACH(Apology::Worker *w, self->topology->get_workers()) { BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor()); std::string in_ports_str, out_ports_str; @@ -222,7 +222,7 @@ static std::string query_topology(ElementImpl *self, const ptree &query) ); } - BOOST_FOREACH(const Apology::Flow &flow, self->executor->get_flat_flows()) + BOOST_FOREACH(const Apology::Flow &flow, self->topology->get_flat_flows()) { buff += str(boost::format("%u:out%u -> %u:in%u;\n") % dynamic_cast<const Apology::Worker *>(flow.src.elem)->get_actor()->GetAddress().AsInteger() diff --git a/python/gras/GRAS_PyHierBlocks.i b/python/gras/GRAS_PyHierBlocks.i index 9f01397..8251336 100644 --- a/python/gras/GRAS_PyHierBlocks.i +++ b/python/gras/GRAS_PyHierBlocks.i @@ -139,6 +139,14 @@ struct HierBlockPython : HierBlock def to_element(obj): try: + + #BEGIN basic block support: + if not hasattr(obj, 'to_element') and hasattr(obj, 'to_basic_block'): + elem = obj.to_basic_block().to_element() + set_weak_py_self(elem, obj) + return elem + #END basic block support. + elem = obj.to_element() set_weak_py_self(elem, obj) return elem diff --git a/tests/thread_pool_test.py b/tests/thread_pool_test.py index 58a00fe..56e0640 100644 --- a/tests/thread_pool_test.py +++ b/tests/thread_pool_test.py @@ -33,7 +33,8 @@ class ThreadPoolTest(unittest.TestCase): c = gras.ThreadPoolConfig() tp = gras.ThreadPool(c) - vec_source.set_thread_pool(tp) + vec_source.global_config().thread_pool = tp + vec_source.commit_config() tb.connect(vec_source, vec_sink) tb.run() |