summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/gras/block.hpp8
-rw-r--r--include/gras/block.i1
-rw-r--r--lib/block.cpp10
-rw-r--r--lib/block_actor.cpp4
-rw-r--r--lib/gras_impl/block_actor.hpp65
-rw-r--r--lib/gras_impl/block_data.hpp83
-rw-r--r--tests/thread_pool_test.py15
7 files changed, 120 insertions, 66 deletions
diff --git a/include/gras/block.hpp b/include/gras/block.hpp
index 378358e..5a6e5e2 100644
--- a/include/gras/block.hpp
+++ b/include/gras/block.hpp
@@ -5,6 +5,7 @@
#include <gras/element.hpp>
#include <gras/sbuffer.hpp>
+#include <gras/thread_pool.hpp>
#include <gras/tag_iter.hpp>
#include <gras/tags.hpp>
#include <gras/work_buffer.hpp>
@@ -488,6 +489,13 @@ struct GRAS_API Block : Element
******************************************************************/
/*!
+ * Set the thread pool of this block.
+ * Every block is created in the default active thread pool.
+ * This call will migrate the block to a new specified pool.
+ */
+ void set_thread_pool(const ThreadPool &thread_pool);
+
+ /*!
* Set if the work call should be interruptible by stop().
* Some work implementations block with the expectation of
* getting a boost thread interrupt in a blocking call.
diff --git a/include/gras/block.i b/include/gras/block.i
index cce0d15..5c62f7e 100644
--- a/include/gras/block.i
+++ b/include/gras/block.i
@@ -12,6 +12,7 @@
%include <gras/tag_iter.i>
%import <gras/sbuffer.i>
%include <gras/buffer_queue.hpp>
+%include <gras/thread_pool.hpp>
%include <gras/block.hpp>
////////////////////////////////////////////////////////////////////////
diff --git a/lib/block.cpp b/lib/block.cpp
index 0059569..835b979 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -36,10 +36,11 @@ Block::Block(const std::string &name):
(*this)->block->prio_token = Token::make();
(*this)->thread_pool = (*this)->block->thread_pool; //ref copy of pool
(*this)->block->name = name; //for debug purposes
+ (*this)->block->block_ptr = this;
+ (*this)->block->data.reset(new BlockData());
//setup some state variables
- (*this)->block->block_ptr = this;
- (*this)->block->block_state = BlockActor::BLOCK_STATE_INIT;
+ (*this)->block->data = BlockActor::BLOCK_STATE_INIT;
//call block methods to init stuff
this->set_interruptible_work(false);
@@ -183,6 +184,11 @@ void Block::notify_topology(const size_t, const size_t)
return;
}
+void Block::set_thread_pool(const ThreadPool &thread_pool)
+{
+ //TODO
+}
+
void Block::set_buffer_affinity(const long affinity)
{
(*this)->block->buffer_affinity = affinity;
diff --git a/lib/block_actor.cpp b/lib/block_actor.cpp
index 464167f..2c7f0fd 100644
--- a/lib/block_actor.cpp
+++ b/lib/block_actor.cpp
@@ -34,8 +34,8 @@ ThreadPool get_active_thread_pool(void)
* Block actor construction - gets active framework
**********************************************************************/
-BlockActor::BlockActor(void):
- Apology::Worker(*get_active_thread_pool())
+BlockActor::BlockActor(const ThreadPool &tp):
+ Apology::Worker((tp)? *tp : *get_active_thread_pool())
{
const char * gras_tpp = getenv("GRAS_TPP");
if (gras_tpp != NULL)
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index a7ed6c5..005855e 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -4,21 +4,12 @@
#define INCLUDED_LIBGRAS_IMPL_BLOCK_ACTOR_HPP
#include <gras_impl/debug.hpp>
-#include <gras_impl/bitset.hpp>
-#include <gras/gras.hpp>
#include <gras/block.hpp>
#include <gras/top_block.hpp>
#include <gras/thread_pool.hpp>
#include <Apology/Worker.hpp>
-#include <gras_impl/token.hpp>
-#include <gras_impl/stats.hpp>
#include <gras_impl/messages.hpp>
-#include <gras_impl/output_buffer_queues.hpp>
-#include <gras_impl/input_buffer_queues.hpp>
-#include <gras_impl/interruptible_thread.hpp>
-#include <vector>
-#include <set>
-#include <map>
+#include <gras_impl/block_data.hpp>
namespace gras
{
@@ -32,12 +23,13 @@ struct PropertyRegistryPair
struct BlockActor : Apology::Worker
{
- BlockActor(void);
+ BlockActor(const ThreadPool &tp = ThreadPool());
~BlockActor(void);
Block *block_ptr;
std::string name; //for debug
ThreadPool thread_pool;
Token prio_token;
+ boost::shared_ptr<BlockData> data;
//do it here so we can match w/ the handler declarations
void register_handlers(void)
@@ -116,62 +108,11 @@ struct BlockActor : Apology::Worker
bool is_input_done(const size_t index);
bool is_work_allowed(void);
- //per port properties
- std::vector<InputPortConfig> input_configs;
- std::vector<OutputPortConfig> output_configs;
-
- //work buffers for the new work interface
- Block::InputItems input_items;
- Block::OutputItems output_items;
-
- //track the subscriber counts
- std::vector<Token> input_tokens;
- std::vector<Token> output_tokens;
- BitSet inputs_done;
- BitSet outputs_done;
- std::set<Token> token_pool;
-
- //buffer queues and ready conditions
- InputBufferQueues input_queues;
- OutputBufferQueues output_queues;
- std::vector<bool> produce_outputs;
- BitSet inputs_available;
- std::vector<time_ticks_t> time_input_not_ready;
- std::vector<time_ticks_t> time_output_not_ready;
-
- //tag and msg tracking
- std::vector<bool> input_tags_changed;
- std::vector<std::vector<Tag> > input_tags;
- std::vector<size_t> num_input_msgs_read;
- std::vector<std::vector<PMCC> > input_msgs;
-
- //interruptible thread stuff
- bool interruptible_work;
- SharedThreadGroup thread_group;
- boost::shared_ptr<InterruptibleThread> interruptible_thread;
-
//work helpers
inline void task_work(void)
{
block_ptr->work(this->input_items, this->output_items);
}
-
- //is the fg running?
- enum
- {
- BLOCK_STATE_INIT,
- BLOCK_STATE_LIVE,
- BLOCK_STATE_DONE,
- } block_state;
- long buffer_affinity;
-
- std::vector<std::vector<OutputHintMessage> > output_allocation_hints;
-
- //property stuff
- std::map<std::string, PropertyRegistryPair> property_registry;
- PMCC prop_access_dispatcher(const std::string &key, const PMCC &value, const bool set);
-
- BlockStats stats;
};
//-------------- common functions from this BlockActor class ---------//
diff --git a/lib/gras_impl/block_data.hpp b/lib/gras_impl/block_data.hpp
new file mode 100644
index 0000000..2dcf3c6
--- /dev/null
+++ b/lib/gras_impl/block_data.hpp
@@ -0,0 +1,83 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#ifndef INCLUDED_LIBGRAS_IMPL_BLOCK_DATA_HPP
+#define INCLUDED_LIBGRAS_IMPL_BLOCK_DATA_HPP
+
+#include <gras_impl/debug.hpp>
+#include <gras_impl/bitset.hpp>
+#include <gras_impl/token.hpp>
+#include <gras_impl/stats.hpp>
+#include <gras_impl/output_buffer_queues.hpp>
+#include <gras_impl/input_buffer_queues.hpp>
+#include <gras_impl/interruptible_thread.hpp>
+#include <vector>
+#include <set>
+#include <map>
+
+namespace gras
+{
+
+typedef boost::shared_ptr<PropertyRegistry> PropertyRegistrySptr;
+struct PropertyRegistryPair
+{
+ PropertyRegistrySptr setter;
+ PropertyRegistrySptr getter;
+};
+
+struct BlockData
+{
+ //per port properties
+ std::vector<InputPortConfig> input_configs;
+ std::vector<OutputPortConfig> output_configs;
+
+ //work buffers for the new work interface
+ Block::InputItems input_items;
+ Block::OutputItems output_items;
+
+ //track the subscriber counts
+ std::vector<Token> input_tokens;
+ std::vector<Token> output_tokens;
+ BitSet inputs_done;
+ BitSet outputs_done;
+ std::set<Token> token_pool;
+
+ //buffer queues and ready conditions
+ InputBufferQueues input_queues;
+ OutputBufferQueues output_queues;
+ std::vector<bool> produce_outputs;
+ BitSet inputs_available;
+ std::vector<time_ticks_t> time_input_not_ready;
+ std::vector<time_ticks_t> time_output_not_ready;
+
+ //tag and msg tracking
+ std::vector<bool> input_tags_changed;
+ std::vector<std::vector<Tag> > input_tags;
+ std::vector<size_t> num_input_msgs_read;
+ std::vector<std::vector<PMCC> > input_msgs;
+
+ //interruptible thread stuff
+ bool interruptible_work;
+ SharedThreadGroup thread_group;
+ boost::shared_ptr<InterruptibleThread> interruptible_thread;
+
+ //is the fg running?
+ enum
+ {
+ BLOCK_STATE_INIT,
+ BLOCK_STATE_LIVE,
+ BLOCK_STATE_DONE,
+ } block_state;
+ long buffer_affinity;
+
+ std::vector<std::vector<OutputHintMessage> > output_allocation_hints;
+
+ //property stuff
+ std::map<std::string, PropertyRegistryPair> property_registry;
+ PMCC prop_access_dispatcher(const std::string &key, const PMCC &value, const bool set);
+
+ BlockStats stats;
+};
+
+} //namespace gras
+
+#endif /*INCLUDED_LIBGRAS_IMPL_BLOCK_DATA_HPP*/
diff --git a/tests/thread_pool_test.py b/tests/thread_pool_test.py
index 0f8b513..2765b07 100644
--- a/tests/thread_pool_test.py
+++ b/tests/thread_pool_test.py
@@ -2,6 +2,7 @@
import unittest
import gras
+from demo_blocks import *
class ThreadPoolTest(unittest.TestCase):
@@ -23,5 +24,19 @@ class ThreadPoolTest(unittest.TestCase):
#here we assume prio 0.0 (default) can always be set
self.assertTrue(gras.ThreadPool.test_thread_priority(0.0))
+ def test_migrate_to_thread_pool(self):
+ tb = gras.TopBlock()
+ vec_source = VectorSource(numpy.uint32, [0, 9, 8, 7, 6])
+ vec_sink = VectorSink(numpy.uint32)
+
+ c = gras.ThreadPoolConfig()
+ tp = gras.ThreadPool(c)
+
+ vec_source.set_thread_pool(tp)
+ tb.connect(vec_source, vec_sink)
+ tb.run()
+
+ self.assertEqual(vec_sink.get_vector(), (0, 9, 8, 7, 6))
+
if __name__ == '__main__':
unittest.main()