diff options
-rw-r--r-- | include/gras/block.hpp | 8 | ||||
-rw-r--r-- | include/gras/block.i | 1 | ||||
-rw-r--r-- | lib/block.cpp | 10 | ||||
-rw-r--r-- | lib/block_actor.cpp | 4 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 65 | ||||
-rw-r--r-- | lib/gras_impl/block_data.hpp | 83 | ||||
-rw-r--r-- | tests/thread_pool_test.py | 15 |
7 files changed, 120 insertions, 66 deletions
diff --git a/include/gras/block.hpp b/include/gras/block.hpp index 378358e..5a6e5e2 100644 --- a/include/gras/block.hpp +++ b/include/gras/block.hpp @@ -5,6 +5,7 @@ #include <gras/element.hpp> #include <gras/sbuffer.hpp> +#include <gras/thread_pool.hpp> #include <gras/tag_iter.hpp> #include <gras/tags.hpp> #include <gras/work_buffer.hpp> @@ -488,6 +489,13 @@ struct GRAS_API Block : Element ******************************************************************/ /*! + * Set the thread pool of this block. + * Every block is created in the default active thread pool. + * This call will migrate the block to a new specified pool. + */ + void set_thread_pool(const ThreadPool &thread_pool); + + /*! * Set if the work call should be interruptible by stop(). * Some work implementations block with the expectation of * getting a boost thread interrupt in a blocking call. diff --git a/include/gras/block.i b/include/gras/block.i index cce0d15..5c62f7e 100644 --- a/include/gras/block.i +++ b/include/gras/block.i @@ -12,6 +12,7 @@ %include <gras/tag_iter.i> %import <gras/sbuffer.i> %include <gras/buffer_queue.hpp> +%include <gras/thread_pool.hpp> %include <gras/block.hpp> //////////////////////////////////////////////////////////////////////// diff --git a/lib/block.cpp b/lib/block.cpp index 0059569..835b979 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -36,10 +36,11 @@ Block::Block(const std::string &name): (*this)->block->prio_token = Token::make(); (*this)->thread_pool = (*this)->block->thread_pool; //ref copy of pool (*this)->block->name = name; //for debug purposes + (*this)->block->block_ptr = this; + (*this)->block->data.reset(new BlockData()); //setup some state variables - (*this)->block->block_ptr = this; - (*this)->block->block_state = BlockActor::BLOCK_STATE_INIT; + (*this)->block->data = BlockActor::BLOCK_STATE_INIT; //call block methods to init stuff this->set_interruptible_work(false); @@ -183,6 +184,11 @@ void Block::notify_topology(const size_t, const size_t) return; } +void Block::set_thread_pool(const ThreadPool &thread_pool) +{ + //TODO +} + void Block::set_buffer_affinity(const long affinity) { (*this)->block->buffer_affinity = affinity; diff --git a/lib/block_actor.cpp b/lib/block_actor.cpp index 464167f..2c7f0fd 100644 --- a/lib/block_actor.cpp +++ b/lib/block_actor.cpp @@ -34,8 +34,8 @@ ThreadPool get_active_thread_pool(void) * Block actor construction - gets active framework **********************************************************************/ -BlockActor::BlockActor(void): - Apology::Worker(*get_active_thread_pool()) +BlockActor::BlockActor(const ThreadPool &tp): + Apology::Worker((tp)? *tp : *get_active_thread_pool()) { const char * gras_tpp = getenv("GRAS_TPP"); if (gras_tpp != NULL) diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index a7ed6c5..005855e 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -4,21 +4,12 @@ #define INCLUDED_LIBGRAS_IMPL_BLOCK_ACTOR_HPP #include <gras_impl/debug.hpp> -#include <gras_impl/bitset.hpp> -#include <gras/gras.hpp> #include <gras/block.hpp> #include <gras/top_block.hpp> #include <gras/thread_pool.hpp> #include <Apology/Worker.hpp> -#include <gras_impl/token.hpp> -#include <gras_impl/stats.hpp> #include <gras_impl/messages.hpp> -#include <gras_impl/output_buffer_queues.hpp> -#include <gras_impl/input_buffer_queues.hpp> -#include <gras_impl/interruptible_thread.hpp> -#include <vector> -#include <set> -#include <map> +#include <gras_impl/block_data.hpp> namespace gras { @@ -32,12 +23,13 @@ struct PropertyRegistryPair struct BlockActor : Apology::Worker { - BlockActor(void); + BlockActor(const ThreadPool &tp = ThreadPool()); ~BlockActor(void); Block *block_ptr; std::string name; //for debug ThreadPool thread_pool; Token prio_token; + boost::shared_ptr<BlockData> data; //do it here so we can match w/ the handler declarations void register_handlers(void) @@ -116,62 +108,11 @@ struct BlockActor : Apology::Worker bool is_input_done(const size_t index); bool is_work_allowed(void); - //per port properties - std::vector<InputPortConfig> input_configs; - std::vector<OutputPortConfig> output_configs; - - //work buffers for the new work interface - Block::InputItems input_items; - Block::OutputItems output_items; - - //track the subscriber counts - std::vector<Token> input_tokens; - std::vector<Token> output_tokens; - BitSet inputs_done; - BitSet outputs_done; - std::set<Token> token_pool; - - //buffer queues and ready conditions - InputBufferQueues input_queues; - OutputBufferQueues output_queues; - std::vector<bool> produce_outputs; - BitSet inputs_available; - std::vector<time_ticks_t> time_input_not_ready; - std::vector<time_ticks_t> time_output_not_ready; - - //tag and msg tracking - std::vector<bool> input_tags_changed; - std::vector<std::vector<Tag> > input_tags; - std::vector<size_t> num_input_msgs_read; - std::vector<std::vector<PMCC> > input_msgs; - - //interruptible thread stuff - bool interruptible_work; - SharedThreadGroup thread_group; - boost::shared_ptr<InterruptibleThread> interruptible_thread; - //work helpers inline void task_work(void) { block_ptr->work(this->input_items, this->output_items); } - - //is the fg running? - enum - { - BLOCK_STATE_INIT, - BLOCK_STATE_LIVE, - BLOCK_STATE_DONE, - } block_state; - long buffer_affinity; - - std::vector<std::vector<OutputHintMessage> > output_allocation_hints; - - //property stuff - std::map<std::string, PropertyRegistryPair> property_registry; - PMCC prop_access_dispatcher(const std::string &key, const PMCC &value, const bool set); - - BlockStats stats; }; //-------------- common functions from this BlockActor class ---------// diff --git a/lib/gras_impl/block_data.hpp b/lib/gras_impl/block_data.hpp new file mode 100644 index 0000000..2dcf3c6 --- /dev/null +++ b/lib/gras_impl/block_data.hpp @@ -0,0 +1,83 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#ifndef INCLUDED_LIBGRAS_IMPL_BLOCK_DATA_HPP +#define INCLUDED_LIBGRAS_IMPL_BLOCK_DATA_HPP + +#include <gras_impl/debug.hpp> +#include <gras_impl/bitset.hpp> +#include <gras_impl/token.hpp> +#include <gras_impl/stats.hpp> +#include <gras_impl/output_buffer_queues.hpp> +#include <gras_impl/input_buffer_queues.hpp> +#include <gras_impl/interruptible_thread.hpp> +#include <vector> +#include <set> +#include <map> + +namespace gras +{ + +typedef boost::shared_ptr<PropertyRegistry> PropertyRegistrySptr; +struct PropertyRegistryPair +{ + PropertyRegistrySptr setter; + PropertyRegistrySptr getter; +}; + +struct BlockData +{ + //per port properties + std::vector<InputPortConfig> input_configs; + std::vector<OutputPortConfig> output_configs; + + //work buffers for the new work interface + Block::InputItems input_items; + Block::OutputItems output_items; + + //track the subscriber counts + std::vector<Token> input_tokens; + std::vector<Token> output_tokens; + BitSet inputs_done; + BitSet outputs_done; + std::set<Token> token_pool; + + //buffer queues and ready conditions + InputBufferQueues input_queues; + OutputBufferQueues output_queues; + std::vector<bool> produce_outputs; + BitSet inputs_available; + std::vector<time_ticks_t> time_input_not_ready; + std::vector<time_ticks_t> time_output_not_ready; + + //tag and msg tracking + std::vector<bool> input_tags_changed; + std::vector<std::vector<Tag> > input_tags; + std::vector<size_t> num_input_msgs_read; + std::vector<std::vector<PMCC> > input_msgs; + + //interruptible thread stuff + bool interruptible_work; + SharedThreadGroup thread_group; + boost::shared_ptr<InterruptibleThread> interruptible_thread; + + //is the fg running? + enum + { + BLOCK_STATE_INIT, + BLOCK_STATE_LIVE, + BLOCK_STATE_DONE, + } block_state; + long buffer_affinity; + + std::vector<std::vector<OutputHintMessage> > output_allocation_hints; + + //property stuff + std::map<std::string, PropertyRegistryPair> property_registry; + PMCC prop_access_dispatcher(const std::string &key, const PMCC &value, const bool set); + + BlockStats stats; +}; + +} //namespace gras + +#endif /*INCLUDED_LIBGRAS_IMPL_BLOCK_DATA_HPP*/ diff --git a/tests/thread_pool_test.py b/tests/thread_pool_test.py index 0f8b513..2765b07 100644 --- a/tests/thread_pool_test.py +++ b/tests/thread_pool_test.py @@ -2,6 +2,7 @@ import unittest import gras +from demo_blocks import * class ThreadPoolTest(unittest.TestCase): @@ -23,5 +24,19 @@ class ThreadPoolTest(unittest.TestCase): #here we assume prio 0.0 (default) can always be set self.assertTrue(gras.ThreadPool.test_thread_priority(0.0)) + def test_migrate_to_thread_pool(self): + tb = gras.TopBlock() + vec_source = VectorSource(numpy.uint32, [0, 9, 8, 7, 6]) + vec_sink = VectorSink(numpy.uint32) + + c = gras.ThreadPoolConfig() + tp = gras.ThreadPool(c) + + vec_source.set_thread_pool(tp) + tb.connect(vec_source, vec_sink) + tb.run() + + self.assertEqual(vec_sink.get_vector(), (0, 9, 8, 7, 6)) + if __name__ == '__main__': unittest.main() |