summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Blum2013-06-06 13:45:50 -0700
committerJosh Blum2013-06-06 13:45:50 -0700
commitb7c8f27d47ca78d2b07e7a4cb53d1c8df6eb32d6 (patch)
tree6ce12ebd668d120823c652f8b09d055a149d70dc
parent7889847eed1e8bc003b88b0d6ad4f7904873d2ac (diff)
parent7350e18b8d5090349390f54b76a0e251b66ce619 (diff)
downloadsandhi-b7c8f27d47ca78d2b07e7a4cb53d1c8df6eb32d6.tar.gz
sandhi-b7c8f27d47ca78d2b07e7a4cb53d1c8df6eb32d6.tar.bz2
sandhi-b7c8f27d47ca78d2b07e7a4cb53d1c8df6eb32d6.zip
Merge branch 'actor_migration'
m---------Apology0
-rw-r--r--include/gras/block.hpp8
-rw-r--r--include/gras/block.i1
-rw-r--r--lib/block.cpp64
-rw-r--r--lib/block_actor.cpp5
-rw-r--r--lib/block_allocator.cpp18
-rw-r--r--lib/block_consume.cpp16
-rw-r--r--lib/block_handlers.cpp82
-rw-r--r--lib/block_message.cpp24
-rw-r--r--lib/block_produce.cpp42
-rw-r--r--lib/block_props.cpp12
-rw-r--r--lib/element.cpp4
-rw-r--r--lib/element_impl.hpp19
-rw-r--r--lib/gras_impl/block_actor.hpp98
-rw-r--r--lib/gras_impl/block_data.hpp88
-rw-r--r--lib/gras_impl/debug.hpp2
-rw-r--r--lib/hier_block.cpp2
-rw-r--r--lib/input_handlers.cpp48
-rw-r--r--lib/output_handlers.cpp36
-rw-r--r--lib/tag_handlers.hpp18
-rw-r--r--lib/task_done.cpp44
-rw-r--r--lib/task_fail.cpp22
-rw-r--r--lib/task_main.cpp78
-rw-r--r--lib/top_block.cpp2
-rw-r--r--lib/top_block_query.cpp30
-rw-r--r--lib/topology_handler.cpp58
-rw-r--r--tests/thread_pool_test.py15
27 files changed, 455 insertions, 381 deletions
diff --git a/Apology b/Apology
-Subproject 2da9fad22c9f55dced0c8467787e1a570d7ed52
+Subproject 2216c9c098ce50fc3813ad4fae53e60d94e9649
diff --git a/include/gras/block.hpp b/include/gras/block.hpp
index 378358e..5a6e5e2 100644
--- a/include/gras/block.hpp
+++ b/include/gras/block.hpp
@@ -5,6 +5,7 @@
#include <gras/element.hpp>
#include <gras/sbuffer.hpp>
+#include <gras/thread_pool.hpp>
#include <gras/tag_iter.hpp>
#include <gras/tags.hpp>
#include <gras/work_buffer.hpp>
@@ -488,6 +489,13 @@ struct GRAS_API Block : Element
******************************************************************/
/*!
+ * Set the thread pool of this block.
+ * Every block is created in the default active thread pool.
+ * This call will migrate the block to a new specified pool.
+ */
+ void set_thread_pool(const ThreadPool &thread_pool);
+
+ /*!
* Set if the work call should be interruptible by stop().
* Some work implementations block with the expectation of
* getting a boost thread interrupt in a blocking call.
diff --git a/include/gras/block.i b/include/gras/block.i
index cce0d15..5c62f7e 100644
--- a/include/gras/block.i
+++ b/include/gras/block.i
@@ -12,6 +12,7 @@
%include <gras/tag_iter.i>
%import <gras/sbuffer.i>
%include <gras/buffer_queue.hpp>
+%include <gras/thread_pool.hpp>
%include <gras/block.hpp>
////////////////////////////////////////////////////////////////////////
diff --git a/lib/block.cpp b/lib/block.cpp
index 0059569..00238ba 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -32,14 +32,17 @@ Block::Block(void)
Block::Block(const std::string &name):
Element(name)
{
- (*this)->block.reset(new BlockActor());
- (*this)->block->prio_token = Token::make();
- (*this)->thread_pool = (*this)->block->thread_pool; //ref copy of pool
- (*this)->block->name = name; //for debug purposes
+ //create non-actor containers
+ (*this)->block_data.reset(new BlockData());
+ (*this)->block_data->block = this;
+ (*this)->worker.reset(new Apology::Worker());
+
+ //create actor and init members
+ (*this)->block_actor.reset(new BlockActor());
+ (*this)->setup_actor();
//setup some state variables
- (*this)->block->block_ptr = this;
- (*this)->block->block_state = BlockActor::BLOCK_STATE_INIT;
+ (*this)->block_data->block_state = BLOCK_STATE_INIT;
//call block methods to init stuff
this->set_interruptible_work(false);
@@ -51,6 +54,15 @@ Block::~Block(void)
//NOP
}
+void ElementImpl::setup_actor(void)
+{
+ this->block_actor->worker = this->worker.get();
+ this->block_actor->name = name; //for debug purposes
+ this->block_actor->data = this->block_data;
+ this->worker->set_actor(this->block_actor.get());
+ this->thread_pool = this->block_actor->thread_pool; //ref copy of pool
+}
+
enum block_cleanup_state_type
{
BLOCK_CLEANUP_WAIT,
@@ -59,11 +71,11 @@ enum block_cleanup_state_type
BLOCK_CLEANUP_DOTS,
};
-static void wait_block_cleanup(ElementImpl &self)
+static void wait_actor_idle(const std::string &repr, Theron::Actor &actor)
{
const boost::system_time start = boost::get_system_time();
block_cleanup_state_type state = BLOCK_CLEANUP_WAIT;
- while (self.block->GetNumQueuedMessages())
+ while (actor.GetNumQueuedMessages())
{
boost::this_thread::sleep(boost::posix_time::milliseconds(1));
switch (state)
@@ -71,7 +83,7 @@ static void wait_block_cleanup(ElementImpl &self)
case BLOCK_CLEANUP_WAIT:
if (boost::get_system_time() > start + boost::posix_time::seconds(1))
{
- std::cerr << self.repr << ", waiting for you to finish." << std::endl;
+ std::cerr << repr << ", waiting for you to finish." << std::endl;
state = BLOCK_CLEANUP_WARN;
}
break;
@@ -79,7 +91,7 @@ static void wait_block_cleanup(ElementImpl &self)
case BLOCK_CLEANUP_WARN:
if (boost::get_system_time() > start + boost::posix_time::seconds(2))
{
- std::cerr << self.repr << ", give up the thread context!" << std::endl;
+ std::cerr << repr << ", give up the thread context!" << std::endl;
state = BLOCK_CLEANUP_DAMN;
}
break;
@@ -87,7 +99,7 @@ static void wait_block_cleanup(ElementImpl &self)
case BLOCK_CLEANUP_DAMN:
if (boost::get_system_time() > start + boost::posix_time::seconds(3))
{
- std::cerr << self.repr << " FAIL; application will now hang..." << std::endl;
+ std::cerr << repr << " FAIL; application will now hang..." << std::endl;
state = BLOCK_CLEANUP_DOTS;
}
break;
@@ -100,10 +112,10 @@ static void wait_block_cleanup(ElementImpl &self)
void ElementImpl::block_cleanup(void)
{
//wait for actor to chew through enqueued messages
- wait_block_cleanup(*this);
+ wait_actor_idle(this->repr, *this->block_actor);
//delete the actor
- this->block.reset();
+ this->block_actor.reset();
//unref actor's framework
this->thread_pool.reset(); //must be deleted after actor
@@ -132,34 +144,34 @@ typename V::value_type &vector_get_resize(V &v, const size_t index)
InputPortConfig &Block::input_config(const size_t which_input)
{
- return vector_get_resize((*this)->block->input_configs, which_input);
+ return vector_get_resize((*this)->block_data->input_configs, which_input);
}
const InputPortConfig &Block::input_config(const size_t which_input) const
{
- return vector_get_const((*this)->block->input_configs, which_input);
+ return vector_get_const((*this)->block_data->input_configs, which_input);
}
OutputPortConfig &Block::output_config(const size_t which_output)
{
- return vector_get_resize((*this)->block->output_configs, which_output);
+ return vector_get_resize((*this)->block_data->output_configs, which_output);
}
const OutputPortConfig &Block::output_config(const size_t which_output) const
{
- return vector_get_const((*this)->block->output_configs, which_output);
+ return vector_get_const((*this)->block_data->output_configs, which_output);
}
void Block::commit_config(void)
{
- Theron::Actor &actor = *((*this)->block);
- for (size_t i = 0; i < (*this)->block->get_num_inputs(); i++)
+ Theron::Actor &actor = *((*this)->block_actor);
+ for (size_t i = 0; i < (*this)->worker->get_num_inputs(); i++)
{
InputUpdateMessage message;
message.index = i;
actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress());
}
- for (size_t i = 0; i < (*this)->block->get_num_outputs(); i++)
+ for (size_t i = 0; i < (*this)->worker->get_num_outputs(); i++)
{
OutputUpdateMessage message;
message.index = i;
@@ -183,12 +195,20 @@ void Block::notify_topology(const size_t, const size_t)
return;
}
+void Block::set_thread_pool(const ThreadPool &thread_pool)
+{
+ boost::shared_ptr<BlockActor> old_actor = (*this)->block_actor;
+ (*this)->block_actor.reset(new BlockActor(thread_pool));
+ (*this)->setup_actor();
+ wait_actor_idle((*this)->repr, *old_actor);
+}
+
void Block::set_buffer_affinity(const long affinity)
{
- (*this)->block->buffer_affinity = affinity;
+ (*this)->block_data->buffer_affinity = affinity;
}
void Block::set_interruptible_work(const bool enb)
{
- (*this)->block->interruptible_work = enb;
+ (*this)->block_data->interruptible_work = enb;
}
diff --git a/lib/block_actor.cpp b/lib/block_actor.cpp
index 464167f..9d417fa 100644
--- a/lib/block_actor.cpp
+++ b/lib/block_actor.cpp
@@ -34,8 +34,8 @@ ThreadPool get_active_thread_pool(void)
* Block actor construction - gets active framework
**********************************************************************/
-BlockActor::BlockActor(void):
- Apology::Worker(*get_active_thread_pool())
+BlockActor::BlockActor(const ThreadPool &tp):
+ Theron::Actor((tp)? *tp : *get_active_thread_pool())
{
const char * gras_tpp = getenv("GRAS_TPP");
if (gras_tpp != NULL)
@@ -50,6 +50,7 @@ BlockActor::BlockActor(void):
active_thread_pool.reset(); //actors hold this, now its safe to reset, weak_framework only
}
this->register_handlers();
+ this->prio_token = Token::make();
}
BlockActor::~BlockActor(void)
diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp
index 0dda99a..aa97405 100644
--- a/lib/block_allocator.cpp
+++ b/lib/block_allocator.cpp
@@ -58,14 +58,14 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address
MESSAGE_TRACER();
//allocate output buffers which will also wake up the task
- const size_t num_outputs = this->get_num_outputs();
+ const size_t num_outputs = worker->get_num_outputs();
for (size_t i = 0; i < num_outputs; i++)
{
const size_t bytes = recommend_length(
- this->output_allocation_hints[i],
- my_round_up_mult(AT_LEAST_BYTES, this->output_configs[i].item_size),
- this->output_configs[i].reserve_items*this->output_configs[i].item_size,
- this->output_configs[i].maximum_items*this->output_configs[i].item_size
+ data->output_allocation_hints[i],
+ my_round_up_mult(AT_LEAST_BYTES, data->output_configs[i].item_size),
+ data->output_configs[i].reserve_items*data->output_configs[i].item_size,
+ data->output_configs[i].maximum_items*data->output_configs[i].item_size
);
SBufferDeleter deleter = boost::bind(&buffer_returner, this->thread_pool, this->GetAddress(), i, _1);
@@ -74,11 +74,11 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address
SBufferConfig config;
config.memory = NULL;
config.length = bytes;
- config.affinity = this->buffer_affinity;
+ config.affinity = data->buffer_affinity;
config.token = token;
- BufferQueueSptr queue = block_ptr->output_buffer_allocator(i, config);
- this->output_queues.set_buffer_queue(i, queue);
+ BufferQueueSptr queue = data->block->output_buffer_allocator(i, config);
+ data->output_queues.set_buffer_queue(i, queue);
InputAllocMessage message;
//new token for the downstream allocator
@@ -87,7 +87,7 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address
config.token = token;
message.config = config;
message.token = token;
- this->post_downstream(i, message);
+ worker->post_downstream(i, message);
}
this->Send(0, from); //ACK
diff --git a/lib/block_consume.cpp b/lib/block_consume.cpp
index efdb07e..421b3bc 100644
--- a/lib/block_consume.cpp
+++ b/lib/block_consume.cpp
@@ -10,26 +10,26 @@ using namespace gras;
void Block::consume(const size_t which_input, const size_t num_items)
{
ASSERT(long(num_items) >= 0); //sign bit set? you dont want a negative
- (*this)->block->consume(which_input, num_items);
+ (*this)->block_actor->consume(which_input, num_items);
}
void Block::consume(const size_t num_items)
{
- const size_t num_inputs = (*this)->block->get_num_inputs();
+ const size_t num_inputs = (*this)->worker->get_num_inputs();
for (size_t i = 0; i < num_inputs; i++)
{
- (*this)->block->consume(i, num_items);
+ (*this)->block_actor->consume(i, num_items);
}
}
item_index_t Block::get_consumed(const size_t which_input)
{
- return (*this)->block->stats.items_consumed[which_input];
+ return (*this)->block_data->stats.items_consumed[which_input];
}
SBuffer Block::get_input_buffer(const size_t which_input) const
{
- return (*this)->block->input_queues.front(which_input);
+ return (*this)->block_data->input_queues.front(which_input);
}
GRAS_FORCE_INLINE void BlockActor::consume(const size_t i, const size_t items)
@@ -37,8 +37,8 @@ GRAS_FORCE_INLINE void BlockActor::consume(const size_t i, const size_t items)
#ifdef ITEM_CONSPROD
std::cerr << name << " consume " << items << std::endl;
#endif
- this->stats.items_consumed[i] += items;
- const size_t bytes = items*this->input_configs[i].item_size;
- this->input_queues.consume(i, bytes);
+ data->stats.items_consumed[i] += items;
+ const size_t bytes = items*data->input_configs[i].item_size;
+ data->input_queues.consume(i, bytes);
this->trim_tags(i);
}
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index e0ee67a..c52d974 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -13,12 +13,12 @@ void BlockActor::handle_top_active(
){
MESSAGE_TRACER();
- if (this->block_state != BLOCK_STATE_LIVE)
+ if (data->block_state != BLOCK_STATE_LIVE)
{
- this->block_ptr->notify_active();
- this->stats.start_time = time_now();
+ data->block->notify_active();
+ data->stats.start_time = time_now();
}
- this->block_state = BLOCK_STATE_LIVE;
+ data->block_state = BLOCK_STATE_LIVE;
this->Send(0, from); //ACK
@@ -43,35 +43,35 @@ void BlockActor::handle_top_token(
MESSAGE_TRACER();
//create input tokens and send allocation hints
- for (size_t i = 0; i < this->get_num_inputs(); i++)
+ for (size_t i = 0; i < worker->get_num_inputs(); i++)
{
- this->input_tokens[i] = Token::make();
- this->inputs_done.reset(i);
+ data->input_tokens[i] = Token::make();
+ data->inputs_done.reset(i);
OutputTokenMessage token_msg;
- token_msg.token = this->input_tokens[i];
- this->post_upstream(i, token_msg);
+ token_msg.token = data->input_tokens[i];
+ worker->post_upstream(i, token_msg);
//TODO, schedule this message as a pre-allocation message
//tell the upstream about the input requirements
OutputHintMessage output_hints;
- output_hints.reserve_bytes = this->input_configs[i].reserve_items*this->input_configs[i].item_size;
- output_hints.token = this->input_tokens[i];
- this->post_upstream(i, output_hints);
+ output_hints.reserve_bytes = data->input_configs[i].reserve_items*data->input_configs[i].item_size;
+ output_hints.token = data->input_tokens[i];
+ worker->post_upstream(i, output_hints);
}
//create output token
- for (size_t i = 0; i < this->get_num_outputs(); i++)
+ for (size_t i = 0; i < worker->get_num_outputs(); i++)
{
- this->output_tokens[i] = Token::make();
- this->outputs_done.reset(i);
+ data->output_tokens[i] = Token::make();
+ data->outputs_done.reset(i);
InputTokenMessage token_msg;
- token_msg.token = this->output_tokens[i];
- this->post_downstream(i, token_msg);
+ token_msg.token = data->output_tokens[i];
+ worker->post_downstream(i, token_msg);
}
//store a token to the top level topology
- this->token_pool.insert(message.token);
+ data->token_pool.insert(message.token);
this->Send(0, from); //ACK
}
@@ -83,18 +83,18 @@ void BlockActor::handle_top_config(
MESSAGE_TRACER();
//overwrite with global config only if maxium_items is not set (zero)
- for (size_t i = 0; i < this->output_configs.size(); i++)
+ for (size_t i = 0; i < data->output_configs.size(); i++)
{
- if (this->output_configs[i].maximum_items == 0)
+ if (data->output_configs[i].maximum_items == 0)
{
- this->output_configs[i].maximum_items = message.maximum_output_items;
+ data->output_configs[i].maximum_items = message.maximum_output_items;
}
}
//overwrite with global node affinity setting for buffers if not set
- if (this->buffer_affinity == -1)
+ if (data->buffer_affinity == -1)
{
- this->buffer_affinity = message.buffer_affinity;
+ data->buffer_affinity = message.buffer_affinity;
}
this->Send(0, from); //ACK
@@ -109,12 +109,12 @@ void BlockActor::handle_top_thread_group(
//store the topology's thread group
//erase any potentially old lingering threads
//spawn a new thread if this block is a source
- this->thread_group = message;
- this->interruptible_thread.reset(); //erase old one
- if (this->interruptible_work)
+ data->thread_group = message;
+ data->interruptible_thread.reset(); //erase old one
+ if (data->interruptible_work)
{
- this->interruptible_thread = boost::make_shared<InterruptibleThread>(
- this->thread_group, boost::bind(&BlockActor::task_work, this)
+ data->interruptible_thread = boost::make_shared<InterruptibleThread>(
+ data->thread_group, boost::bind(&BlockActor::task_work, this)
);
}
@@ -137,25 +137,25 @@ void BlockActor::handle_get_stats(
//instantaneous states we update here,
//and not interleaved with the rest of the code
- const size_t num_inputs = this->get_num_inputs();
- this->stats.items_enqueued.resize(num_inputs);
- this->stats.tags_enqueued.resize(num_inputs);
- this->stats.msgs_enqueued.resize(num_inputs);
+ const size_t num_inputs = worker->get_num_inputs();
+ data->stats.items_enqueued.resize(num_inputs);
+ data->stats.tags_enqueued.resize(num_inputs);
+ data->stats.msgs_enqueued.resize(num_inputs);
for (size_t i = 0; i < num_inputs; i++)
{
- this->stats.items_enqueued[i] = this->input_queues.get_items_enqueued(i);
- this->stats.tags_enqueued[i] = this->input_tags[i].size();
- this->stats.msgs_enqueued[i] = this->input_msgs[i].size();
+ data->stats.items_enqueued[i] = data->input_queues.get_items_enqueued(i);
+ data->stats.tags_enqueued[i] = data->input_tags[i].size();
+ data->stats.msgs_enqueued[i] = data->input_msgs[i].size();
}
- this->stats.actor_queue_depth = this->GetNumQueuedMessages();
- this->stats.bytes_copied = this->input_queues.bytes_copied;
- this->stats.inputs_idle = this->input_queues.total_idle_times;
- this->stats.outputs_idle = this->output_queues.total_idle_times;
+ data->stats.actor_queue_depth = this->GetNumQueuedMessages();
+ data->stats.bytes_copied = data->input_queues.bytes_copied;
+ data->stats.inputs_idle = data->input_queues.total_idle_times;
+ data->stats.outputs_idle = data->output_queues.total_idle_times;
//create the message reply object
GetStatsMessage message;
- message.block_id = this->block_ptr->get_uid();
- message.stats = this->stats;
+ message.block_id = data->block->get_uid();
+ message.stats = data->stats;
message.stats_time = time_now();
this->Send(message, from); //ACK
diff --git a/lib/block_message.cpp b/lib/block_message.cpp
index ef7dda6..08a3784 100644
--- a/lib/block_message.cpp
+++ b/lib/block_message.cpp
@@ -8,35 +8,35 @@ using namespace gras;
void Block::post_output_tag(const size_t which_output, const Tag &tag)
{
- (*this)->block->stats.tags_produced[which_output]++;
- (*this)->block->post_downstream(which_output, InputTagMessage(tag));
+ (*this)->block_data->stats.tags_produced[which_output]++;
+ (*this)->worker->post_downstream(which_output, InputTagMessage(tag));
}
void Block::_post_output_msg(const size_t which_output, const PMCC &msg)
{
- (*this)->block->stats.msgs_produced[which_output]++;
- (*this)->block->post_downstream(which_output, InputMsgMessage(msg));
+ (*this)->block_data->stats.msgs_produced[which_output]++;
+ (*this)->worker->post_downstream(which_output, InputMsgMessage(msg));
}
TagIter Block::get_input_tags(const size_t which_input)
{
- const std::vector<Tag> &input_tags = (*this)->block->input_tags[which_input];
+ const std::vector<Tag> &input_tags = (*this)->block_data->input_tags[which_input];
return TagIter(input_tags.begin(), input_tags.end());
}
PMCC Block::pop_input_msg(const size_t which_input)
{
- std::vector<PMCC> &input_msgs = (*this)->block->input_msgs[which_input];
- size_t &num_read = (*this)->block->num_input_msgs_read[which_input];
+ std::vector<PMCC> &input_msgs = (*this)->block_data->input_msgs[which_input];
+ size_t &num_read = (*this)->block_data->num_input_msgs_read[which_input];
if (num_read >= input_msgs.size()) return PMCC();
PMCC p = input_msgs[num_read++];
- (*this)->block->stats.msgs_consumed[which_input]++;
+ (*this)->block_data->stats.msgs_consumed[which_input]++;
return p;
}
void Block::propagate_tags(const size_t i, const TagIter &iter)
{
- const size_t num_outputs = (*this)->block->get_num_outputs();
+ const size_t num_outputs = (*this)->worker->get_num_outputs();
for (size_t o = 0; o < num_outputs; o++)
{
BOOST_FOREACH(gras::Tag t, iter)
@@ -52,7 +52,7 @@ void Block::post_input_tag(const size_t which_input, const Tag &tag)
{
InputTagMessage message(tag);
message.index = which_input;
- Theron::Actor &actor = *((*this)->block);
+ Theron::Actor &actor = *((*this)->block_actor);
actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress());
}
@@ -60,7 +60,7 @@ void Block::_post_input_msg(const size_t which_input, const PMCC &msg)
{
InputMsgMessage message(msg);
message.index = which_input;
- Theron::Actor &actor = *((*this)->block);
+ Theron::Actor &actor = *((*this)->block_actor);
actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress());
}
@@ -69,6 +69,6 @@ void Block::post_input_buffer(const size_t which_input, const SBuffer &buffer)
InputBufferMessage message;
message.index = which_input;
message.buffer = buffer;
- Theron::Actor &actor = *((*this)->block);
+ Theron::Actor &actor = *((*this)->block_actor);
actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress());
}
diff --git a/lib/block_produce.cpp b/lib/block_produce.cpp
index 584d3a3..7133321 100644
--- a/lib/block_produce.cpp
+++ b/lib/block_produce.cpp
@@ -9,26 +9,26 @@ using namespace gras;
void Block::produce(const size_t which_output, const size_t num_items)
{
ASSERT(long(num_items) >= 0); //sign bit set? you dont want a negative
- (*this)->block->produce(which_output, num_items);
+ (*this)->block_actor->produce(which_output, num_items);
}
void Block::produce(const size_t num_items)
{
- const size_t num_outputs = (*this)->block->get_num_outputs();
+ const size_t num_outputs = (*this)->worker->get_num_outputs();
for (size_t o = 0; o < num_outputs; o++)
{
- (*this)->block->produce(o, num_items);
+ (*this)->block_actor->produce(o, num_items);
}
}
item_index_t Block::get_produced(const size_t which_output)
{
- return (*this)->block->stats.items_produced[which_output];
+ return (*this)->block_data->stats.items_produced[which_output];
}
SBuffer Block::get_output_buffer(const size_t which_output) const
{
- SBuffer &buff = (*this)->block->output_queues.front(which_output);
+ SBuffer &buff = (*this)->block_data->output_queues.front(which_output);
//increment length to auto pop full buffer size,
//when user doesnt call pop_output_buffer()
buff.length = buff.get_actual_length();
@@ -37,12 +37,19 @@ SBuffer Block::get_output_buffer(const size_t which_output) const
void Block::pop_output_buffer(const size_t which_output, const size_t num_bytes)
{
- (*this)->block->output_queues.front(which_output).length = num_bytes;
+ (*this)->block_data->output_queues.front(which_output).length = num_bytes;
}
void Block::post_output_buffer(const size_t which_output, const SBuffer &buffer)
{
- (*this)->block->produce_buffer(which_output, buffer);
+ boost::shared_ptr<BlockData> &data = (*this)->block_data;
+ data->output_queues.consume(which_output);
+ ASSERT((buffer.length % data->output_configs[which_output].item_size) == 0);
+ const size_t items = buffer.length/data->output_configs[which_output].item_size;
+ data->stats.items_produced[which_output] += items;
+ InputBufferMessage buff_msg;
+ buff_msg.buffer = buffer;
+ (*this)->worker->post_downstream(which_output, buff_msg);
}
GRAS_FORCE_INLINE void BlockActor::produce(const size_t i, const size_t items)
@@ -50,21 +57,10 @@ GRAS_FORCE_INLINE void BlockActor::produce(const size_t i, const size_t items)
#ifdef ITEM_CONSPROD
std::cerr << name << " produce " << items << std::endl;
#endif
- SBuffer &buff = this->output_queues.front(i);
- ASSERT((buff.length % output_configs[i].item_size) == 0);
- this->stats.items_produced[i] += items;
- const size_t bytes = items*this->output_configs[i].item_size;
+ SBuffer &buff = data->output_queues.front(i);
+ ASSERT((buff.length % data->output_configs[i].item_size) == 0);
+ data->stats.items_produced[i] += items;
+ const size_t bytes = items*data->output_configs[i].item_size;
buff.length += bytes;
- this->produce_outputs[i] = true;
-}
-
-GRAS_FORCE_INLINE void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer)
-{
- this->output_queues.consume(i);
- ASSERT((buffer.length % output_configs[i].item_size) == 0);
- const size_t items = buffer.length/output_configs[i].item_size;
- this->stats.items_produced[i] += items;
- InputBufferMessage buff_msg;
- buff_msg.buffer = buffer;
- this->post_downstream(i, buff_msg);
+ data->produce_outputs[i] = true;
}
diff --git a/lib/block_props.cpp b/lib/block_props.cpp
index b3456ac..a85d948 100644
--- a/lib/block_props.cpp
+++ b/lib/block_props.cpp
@@ -24,7 +24,7 @@ void BlockActor::handle_prop_access(
//call into the handler overload to do the property access
try
{
- reply.value = block_ptr->_handle_prop_access(message.key, message.value, message.set);
+ reply.value = data->block->_handle_prop_access(message.key, message.value, message.set);
}
catch (const std::exception &e)
{
@@ -45,7 +45,7 @@ void BlockActor::handle_prop_access(
PMCC Block::_handle_prop_access(const std::string &key, const PMCC &value, const bool set)
{
- const PropertyRegistryPair &pair = (*this)->block->property_registry[key];
+ const PropertyRegistryPair &pair = (*this)->block_data->property_registry[key];
PropertyRegistrySptr pr = (set)? pair.setter : pair.getter;
if (not pr) throw std::invalid_argument("no property registered for key: " + key);
if (set)
@@ -96,20 +96,20 @@ PMCC BlockActor::prop_access_dispatcher(const std::string &key, const PMCC &valu
void Block::_register_getter(const std::string &key, void *pr)
{
- (*this)->block->property_registry[key].getter.reset(reinterpret_cast<PropertyRegistry *>(pr));
+ (*this)->block_data->property_registry[key].getter.reset(reinterpret_cast<PropertyRegistry *>(pr));
}
void Block::_register_setter(const std::string &key, void *pr)
{
- (*this)->block->property_registry[key].setter.reset(reinterpret_cast<PropertyRegistry *>(pr));
+ (*this)->block_data->property_registry[key].setter.reset(reinterpret_cast<PropertyRegistry *>(pr));
}
void Block::_set_property(const std::string &key, const PMCC &value)
{
- (*this)->block->prop_access_dispatcher(key, value, true);
+ (*this)->block_actor->prop_access_dispatcher(key, value, true);
}
PMCC Block::_get_property(const std::string &key)
{
- return (*this)->block->prop_access_dispatcher(key, PMCC(), false);
+ return (*this)->block_actor->prop_access_dispatcher(key, PMCC(), false);
}
diff --git a/lib/element.cpp b/lib/element.cpp
index c06d96d..4b73bd4 100644
--- a/lib/element.cpp
+++ b/lib/element.cpp
@@ -61,7 +61,7 @@ ElementImpl::~ElementImpl(void)
{
if (this->executor) this->top_block_cleanup();
if (this->topology) this->hier_block_cleanup();
- if (this->block) this->block_cleanup();
+ if (this->worker) this->block_cleanup();
}
void Element::set_container(WeakContainer *container)
@@ -138,5 +138,5 @@ Block *Element::locate_block(const std::string &path)
}
//return block ptr as result
- return elem->block->block_ptr;
+ return elem->block_data->block;
}
diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp
index 477b478..23f7127 100644
--- a/lib/element_impl.hpp
+++ b/lib/element_impl.hpp
@@ -4,6 +4,7 @@
#define INCLUDED_LIBGRAS_ELEMENT_IMPL_HPP
#include <gras_impl/block_actor.hpp>
+#include <Apology/Worker.hpp>
#include <Apology/Topology.hpp>
#include <Apology/Executor.hpp>
#include <gras/element.hpp>
@@ -18,6 +19,8 @@ namespace gras
struct ElementImpl
{
+ //setup stuff
+ void setup_actor(void);
//deconstructor stuff
~ElementImpl(void);
@@ -42,25 +45,29 @@ struct ElementImpl
std::map<std::string, Element> children;
//things may be in this element
+ boost::shared_ptr<Apology::Worker> worker;
boost::shared_ptr<Apology::Topology> topology;
boost::shared_ptr<Apology::Executor> executor;
- boost::shared_ptr<BlockActor> block;
+ boost::shared_ptr<BlockActor> block_actor;
+ boost::shared_ptr<BlockData> block_data;
ThreadPool thread_pool;
Apology::Base *get_elem(void) const
{
- if (block) return block.get();
- return topology.get();
+ if (worker) return worker.get();
+ if (topology) return topology.get();
+ else throw std::runtime_error("ElementImpl::get_elem fail");
}
template <typename MessageType>
void bcast_prio_msg(const MessageType &msg)
{
Theron::Receiver receiver;
- BOOST_FOREACH(Apology::Worker *worker, this->executor->get_workers())
+ BOOST_FOREACH(Apology::Worker *w, this->executor->get_workers())
{
+ BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor());
MessageType message = msg;
- message.prio_token = dynamic_cast<BlockActor *>(worker)->prio_token;
- worker->GetFramework().Send(message, receiver.GetAddress(), worker->GetAddress());
+ message.prio_token = actor->prio_token;
+ actor->GetFramework().Send(message, receiver.GetAddress(), actor->GetAddress());
}
size_t outstandingCount(this->executor->get_workers().size());
while (outstandingCount != 0)
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index a7ed6c5..90e96bb 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -4,40 +4,25 @@
#define INCLUDED_LIBGRAS_IMPL_BLOCK_ACTOR_HPP
#include <gras_impl/debug.hpp>
-#include <gras_impl/bitset.hpp>
-#include <gras/gras.hpp>
#include <gras/block.hpp>
#include <gras/top_block.hpp>
#include <gras/thread_pool.hpp>
#include <Apology/Worker.hpp>
-#include <gras_impl/token.hpp>
-#include <gras_impl/stats.hpp>
#include <gras_impl/messages.hpp>
-#include <gras_impl/output_buffer_queues.hpp>
-#include <gras_impl/input_buffer_queues.hpp>
-#include <gras_impl/interruptible_thread.hpp>
-#include <vector>
-#include <set>
-#include <map>
+#include <gras_impl/block_data.hpp>
namespace gras
{
-typedef boost::shared_ptr<PropertyRegistry> PropertyRegistrySptr;
-struct PropertyRegistryPair
+struct BlockActor : Theron::Actor
{
- PropertyRegistrySptr setter;
- PropertyRegistrySptr getter;
-};
-
-struct BlockActor : Apology::Worker
-{
- BlockActor(void);
+ BlockActor(const ThreadPool &tp = ThreadPool());
~BlockActor(void);
- Block *block_ptr;
std::string name; //for debug
ThreadPool thread_pool;
Token prio_token;
+ boost::shared_ptr<BlockData> data;
+ Apology::Worker *worker;
//do it here so we can match w/ the handler declarations
void register_handlers(void)
@@ -110,68 +95,19 @@ struct BlockActor : Apology::Worker
void trim_msgs(const size_t index);
void produce(const size_t index, const size_t items);
void consume(const size_t index, const size_t items);
- void produce_buffer(const size_t index, const SBuffer &buffer);
void task_kicker(void);
void update_input_avail(const size_t index);
bool is_input_done(const size_t index);
bool is_work_allowed(void);
- //per port properties
- std::vector<InputPortConfig> input_configs;
- std::vector<OutputPortConfig> output_configs;
-
- //work buffers for the new work interface
- Block::InputItems input_items;
- Block::OutputItems output_items;
-
- //track the subscriber counts
- std::vector<Token> input_tokens;
- std::vector<Token> output_tokens;
- BitSet inputs_done;
- BitSet outputs_done;
- std::set<Token> token_pool;
-
- //buffer queues and ready conditions
- InputBufferQueues input_queues;
- OutputBufferQueues output_queues;
- std::vector<bool> produce_outputs;
- BitSet inputs_available;
- std::vector<time_ticks_t> time_input_not_ready;
- std::vector<time_ticks_t> time_output_not_ready;
-
- //tag and msg tracking
- std::vector<bool> input_tags_changed;
- std::vector<std::vector<Tag> > input_tags;
- std::vector<size_t> num_input_msgs_read;
- std::vector<std::vector<PMCC> > input_msgs;
-
- //interruptible thread stuff
- bool interruptible_work;
- SharedThreadGroup thread_group;
- boost::shared_ptr<InterruptibleThread> interruptible_thread;
-
//work helpers
inline void task_work(void)
{
- block_ptr->work(this->input_items, this->output_items);
+ data->block->work(data->input_items, data->output_items);
}
- //is the fg running?
- enum
- {
- BLOCK_STATE_INIT,
- BLOCK_STATE_LIVE,
- BLOCK_STATE_DONE,
- } block_state;
- long buffer_affinity;
-
- std::vector<std::vector<OutputHintMessage> > output_allocation_hints;
-
//property stuff
- std::map<std::string, PropertyRegistryPair> property_registry;
PMCC prop_access_dispatcher(const std::string &key, const PMCC &value, const bool set);
-
- BlockStats stats;
};
//-------------- common functions from this BlockActor class ---------//
@@ -183,27 +119,27 @@ GRAS_FORCE_INLINE void BlockActor::task_kicker(void)
GRAS_FORCE_INLINE void BlockActor::update_input_avail(const size_t i)
{
- const bool has_input_bufs = not this->input_queues.empty(i) and this->input_queues.ready(i);
- const bool has_input_msgs = not this->input_msgs[i].empty();
- this->inputs_available.set(i, has_input_bufs or has_input_msgs);
- this->input_queues.update_has_msg(i, has_input_msgs);
+ const bool has_input_bufs = not data->input_queues.empty(i) and data->input_queues.ready(i);
+ const bool has_input_msgs = not data->input_msgs[i].empty();
+ data->inputs_available.set(i, has_input_bufs or has_input_msgs);
+ data->input_queues.update_has_msg(i, has_input_msgs);
}
GRAS_FORCE_INLINE bool BlockActor::is_input_done(const size_t i)
{
- const bool force_done = this->input_configs[i].force_done;
- if GRAS_LIKELY(force_done) return this->inputs_done[i] and not this->inputs_available[i];
- return this->inputs_done.all() and this->inputs_available.none();
+ const bool force_done = data->input_configs[i].force_done;
+ if GRAS_LIKELY(force_done) return data->inputs_done[i] and not data->inputs_available[i];
+ return data->inputs_done.all() and data->inputs_available.none();
}
GRAS_FORCE_INLINE bool BlockActor::is_work_allowed(void)
{
return (
this->prio_token.unique() and
- this->block_state == BLOCK_STATE_LIVE and
- this->inputs_available.any() and
- this->input_queues.all_ready() and
- this->output_queues.all_ready()
+ data->block_state == BLOCK_STATE_LIVE and
+ data->inputs_available.any() and
+ data->input_queues.all_ready() and
+ data->output_queues.all_ready()
);
}
diff --git a/lib/gras_impl/block_data.hpp b/lib/gras_impl/block_data.hpp
new file mode 100644
index 0000000..4b6e8de
--- /dev/null
+++ b/lib/gras_impl/block_data.hpp
@@ -0,0 +1,88 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#ifndef INCLUDED_LIBGRAS_IMPL_BLOCK_DATA_HPP
+#define INCLUDED_LIBGRAS_IMPL_BLOCK_DATA_HPP
+
+#include <gras/block.hpp>
+#include <gras_impl/debug.hpp>
+#include <gras_impl/bitset.hpp>
+#include <gras_impl/token.hpp>
+#include <gras_impl/stats.hpp>
+#include <gras_impl/output_buffer_queues.hpp>
+#include <gras_impl/input_buffer_queues.hpp>
+#include <gras_impl/interruptible_thread.hpp>
+#include <vector>
+#include <set>
+#include <map>
+
+namespace gras
+{
+
+typedef boost::shared_ptr<PropertyRegistry> PropertyRegistrySptr;
+struct PropertyRegistryPair
+{
+ PropertyRegistrySptr setter;
+ PropertyRegistrySptr getter;
+};
+
+enum BlockState
+{
+ BLOCK_STATE_INIT,
+ BLOCK_STATE_LIVE,
+ BLOCK_STATE_DONE,
+};
+
+struct BlockData
+{
+ //block pointer to call into parent
+ Block *block;
+
+ //per port properties
+ std::vector<InputPortConfig> input_configs;
+ std::vector<OutputPortConfig> output_configs;
+
+ //work buffers for the new work interface
+ Block::InputItems input_items;
+ Block::OutputItems output_items;
+
+ //track the subscriber counts
+ std::vector<Token> input_tokens;
+ std::vector<Token> output_tokens;
+ BitSet inputs_done;
+ BitSet outputs_done;
+ std::set<Token> token_pool;
+
+ //buffer queues and ready conditions
+ InputBufferQueues input_queues;
+ OutputBufferQueues output_queues;
+ std::vector<bool> produce_outputs;
+ BitSet inputs_available;
+ std::vector<time_ticks_t> time_input_not_ready;
+ std::vector<time_ticks_t> time_output_not_ready;
+
+ //tag and msg tracking
+ std::vector<bool> input_tags_changed;
+ std::vector<std::vector<Tag> > input_tags;
+ std::vector<size_t> num_input_msgs_read;
+ std::vector<std::vector<PMCC> > input_msgs;
+
+ //interruptible thread stuff
+ bool interruptible_work;
+ SharedThreadGroup thread_group;
+ boost::shared_ptr<InterruptibleThread> interruptible_thread;
+
+ //is the fg running?
+ BlockState block_state;
+ long buffer_affinity;
+
+ std::vector<std::vector<OutputHintMessage> > output_allocation_hints;
+
+ //property stuff
+ std::map<std::string, PropertyRegistryPair> property_registry;
+
+ BlockStats stats;
+};
+
+} //namespace gras
+
+#endif /*INCLUDED_LIBGRAS_IMPL_BLOCK_DATA_HPP*/
diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp
index 04480d8..eb09090 100644
--- a/lib/gras_impl/debug.hpp
+++ b/lib/gras_impl/debug.hpp
@@ -48,7 +48,7 @@ extern void *operator new(std::size_t n) throw (std::bad_alloc);
#define VAR(x) std::cerr << #x << " = " << (x) << std::endl << std::flush;
#ifdef MESSAGE_TRACING
-#define MESSAGE_TRACER() std::cerr << block_ptr->to_string() << " in " << BOOST_CURRENT_FUNCTION << std::endl << std::flush;
+#define MESSAGE_TRACER() std::cerr << name << " in " << BOOST_CURRENT_FUNCTION << std::endl << std::flush;
#else
#define MESSAGE_TRACER()
#endif
diff --git a/lib/hier_block.cpp b/lib/hier_block.cpp
index 758af0c..abbdec1 100644
--- a/lib/hier_block.cpp
+++ b/lib/hier_block.cpp
@@ -15,7 +15,7 @@ HierBlock::HierBlock(void)
HierBlock::HierBlock(const std::string &name):
Element(name)
{
- (*this)->topology = boost::shared_ptr<Apology::Topology>(new Apology::Topology());
+ (*this)->topology.reset(new Apology::Topology());
}
HierBlock::~HierBlock(void)
diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp
index faddda8..89e1d34 100644
--- a/lib/input_handlers.cpp
+++ b/lib/input_handlers.cpp
@@ -7,25 +7,25 @@ using namespace gras;
void BlockActor::handle_input_tag(const InputTagMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_input);
+ TimerAccumulate ta(data->stats.total_time_input);
MESSAGE_TRACER();
const size_t index = message.index;
//handle incoming stream tag, push into the tag storage
- if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return;
- this->input_tags[index].push_back(message.tag);
- this->input_tags_changed[index] = true;
+ if GRAS_UNLIKELY(data->block_state == BLOCK_STATE_DONE) return;
+ data->input_tags[index].push_back(message.tag);
+ data->input_tags_changed[index] = true;
}
void BlockActor::handle_input_msg(const InputMsgMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_input);
+ TimerAccumulate ta(data->stats.total_time_input);
MESSAGE_TRACER();
const size_t index = message.index;
//handle incoming async message, push into the msg storage
- if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return;
- this->input_msgs[index].push_back(message.msg);
+ if GRAS_UNLIKELY(data->block_state == BLOCK_STATE_DONE) return;
+ data->input_msgs[index].push_back(message.msg);
this->update_input_avail(index);
ta.done();
@@ -34,13 +34,13 @@ void BlockActor::handle_input_msg(const InputMsgMessage &message, const Theron::
void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_input);
+ TimerAccumulate ta(data->stats.total_time_input);
MESSAGE_TRACER();
const size_t index = message.index;
//handle incoming stream buffer, push into the queue
- if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return;
- this->input_queues.push(index, message.buffer);
+ if GRAS_UNLIKELY(data->block_state == BLOCK_STATE_DONE) return;
+ data->input_queues.push(index, message.buffer);
this->update_input_avail(index);
ta.done();
@@ -49,22 +49,22 @@ void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Th
void BlockActor::handle_input_token(const InputTokenMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_input);
+ TimerAccumulate ta(data->stats.total_time_input);
MESSAGE_TRACER();
- ASSERT(message.index < this->get_num_inputs());
+ ASSERT(message.index < worker->get_num_inputs());
//store the token of the upstream producer
- this->token_pool.insert(message.token);
+ data->token_pool.insert(message.token);
}
void BlockActor::handle_input_check(const InputCheckMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_input);
+ TimerAccumulate ta(data->stats.total_time_input);
MESSAGE_TRACER();
const size_t index = message.index;
//an upstream block declared itself done, recheck the token
- this->inputs_done.set(index, this->input_tokens[index].unique());
+ data->inputs_done.set(index, data->input_tokens[index].unique());
//upstream done, give it one more attempt at task handling
ta.done();
@@ -76,26 +76,26 @@ void BlockActor::handle_input_check(const InputCheckMessage &message, const Ther
void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_input);
+ TimerAccumulate ta(data->stats.total_time_input);
MESSAGE_TRACER();
const size_t index = message.index;
//handle the upstream block allocation request
OutputAllocMessage new_msg;
- new_msg.queue = block_ptr->input_buffer_allocator(index, message.config);
- if (new_msg.queue) this->post_upstream(index, new_msg);
+ new_msg.queue = data->block->input_buffer_allocator(index, message.config);
+ if (new_msg.queue) worker->post_upstream(index, new_msg);
}
void BlockActor::handle_input_update(const InputUpdateMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_input);
+ TimerAccumulate ta(data->stats.total_time_input);
MESSAGE_TRACER();
const size_t i = message.index;
//update buffer queue configuration
- if (i >= this->input_queues.size()) return;
- const size_t preload_bytes = this->input_configs[i].item_size*this->input_configs[i].preload_items;
- const size_t reserve_bytes = this->input_configs[i].item_size*this->input_configs[i].reserve_items;
- const size_t maximum_bytes = this->input_configs[i].item_size*this->input_configs[i].maximum_items;
- this->input_queues.update_config(i, this->input_configs[i].item_size, preload_bytes, reserve_bytes, maximum_bytes);
+ if (i >= data->input_queues.size()) return;
+ const size_t preload_bytes = data->input_configs[i].item_size*data->input_configs[i].preload_items;
+ const size_t reserve_bytes = data->input_configs[i].item_size*data->input_configs[i].reserve_items;
+ const size_t maximum_bytes = data->input_configs[i].item_size*data->input_configs[i].maximum_items;
+ data->input_queues.update_config(i, data->input_configs[i].item_size, preload_bytes, reserve_bytes, maximum_bytes);
}
diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp
index d937bbf..42dfe39 100644
--- a/lib/output_handlers.cpp
+++ b/lib/output_handlers.cpp
@@ -8,14 +8,14 @@ using namespace gras;
void BlockActor::handle_output_buffer(const OutputBufferMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_output);
+ TimerAccumulate ta(data->stats.total_time_output);
MESSAGE_TRACER();
const size_t index = message.index;
//a buffer has returned from the downstream
//(all interested consumers have finished with it)
- if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return;
- this->output_queues.push(index, message.buffer);
+ if GRAS_UNLIKELY(data->block_state == BLOCK_STATE_DONE) return;
+ data->output_queues.push(index, message.buffer);
ta.done();
this->task_main();
@@ -23,23 +23,23 @@ void BlockActor::handle_output_buffer(const OutputBufferMessage &message, const
void BlockActor::handle_output_token(const OutputTokenMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_output);
+ TimerAccumulate ta(data->stats.total_time_output);
MESSAGE_TRACER();
- ASSERT(message.index < this->get_num_outputs());
+ ASSERT(message.index < worker->get_num_outputs());
//store the token of the downstream consumer
- this->token_pool.insert(message.token);
+ data->token_pool.insert(message.token);
}
void BlockActor::handle_output_check(const OutputCheckMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_output);
+ TimerAccumulate ta(data->stats.total_time_output);
MESSAGE_TRACER();
const size_t index = message.index;
//a downstream block has declared itself done, recheck the token
- this->outputs_done.set(index, this->output_tokens[index].unique());
- if (this->outputs_done.all()) //no downstream subscribers?
+ data->outputs_done.set(index, data->output_tokens[index].unique());
+ if (data->outputs_done.all()) //no downstream subscribers?
{
this->mark_done();
}
@@ -47,7 +47,7 @@ void BlockActor::handle_output_check(const OutputCheckMessage &message, const Th
void BlockActor::handle_output_hint(const OutputHintMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_output);
+ TimerAccumulate ta(data->stats.total_time_output);
MESSAGE_TRACER();
const size_t index = message.index;
@@ -57,7 +57,7 @@ void BlockActor::handle_output_hint(const OutputHintMessage &message, const Ther
//remove any old hints with expired token
//remove any older hints with matching token
std::vector<OutputHintMessage> hints;
- BOOST_FOREACH(const OutputHintMessage &hint, this->output_allocation_hints[index])
+ BOOST_FOREACH(const OutputHintMessage &hint, data->output_allocation_hints[index])
{
if (hint.token.expired()) continue;
if (hint.token.lock() == message.token.lock()) continue;
@@ -67,27 +67,27 @@ void BlockActor::handle_output_hint(const OutputHintMessage &message, const Ther
//store the new hint as well
hints.push_back(message);
- this->output_allocation_hints[index] = hints;
+ data->output_allocation_hints[index] = hints;
}
void BlockActor::handle_output_alloc(const OutputAllocMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_output);
+ TimerAccumulate ta(data->stats.total_time_output);
MESSAGE_TRACER();
const size_t index = message.index;
//return of a positive downstream allocation
- this->output_queues.set_buffer_queue(index, message.queue);
+ data->output_queues.set_buffer_queue(index, message.queue);
}
void BlockActor::handle_output_update(const OutputUpdateMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_output);
+ TimerAccumulate ta(data->stats.total_time_output);
MESSAGE_TRACER();
const size_t i = message.index;
//update buffer queue configuration
- if (i >= this->output_queues.size()) return;
- const size_t reserve_bytes = this->output_configs[i].item_size*this->output_configs[i].reserve_items;
- this->output_queues.set_reserve_bytes(i, reserve_bytes);
+ if (i >= data->output_queues.size()) return;
+ const size_t reserve_bytes = data->output_configs[i].item_size*data->output_configs[i].reserve_items;
+ data->output_queues.set_reserve_bytes(i, reserve_bytes);
}
diff --git a/lib/tag_handlers.hpp b/lib/tag_handlers.hpp
index 8d504dd..a52b70e 100644
--- a/lib/tag_handlers.hpp
+++ b/lib/tag_handlers.hpp
@@ -11,10 +11,10 @@ namespace gras
GRAS_FORCE_INLINE void BlockActor::sort_tags(const size_t i)
{
- if GRAS_LIKELY(not this->input_tags_changed[i]) return;
- std::vector<Tag> &tags_i = this->input_tags[i];
+ if GRAS_LIKELY(not data->input_tags_changed[i]) return;
+ std::vector<Tag> &tags_i = data->input_tags[i];
std::sort(tags_i.begin(), tags_i.end());
- this->input_tags_changed[i] = false;
+ data->input_tags_changed[i] = false;
}
GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i)
@@ -24,8 +24,8 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i)
//-- and post trimmed tags to the downstream based on policy
//------------------------------------------------------------------
- std::vector<Tag> &tags_i = this->input_tags[i];
- const item_index_t items_consumed_i = this->stats.items_consumed[i];
+ std::vector<Tag> &tags_i = data->input_tags[i];
+ const item_index_t items_consumed_i = data->stats.items_consumed[i];
size_t last = 0;
while (last < tags_i.size() and tags_i[last].offset < items_consumed_i)
{
@@ -35,19 +35,19 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i)
if GRAS_LIKELY(last == 0) return;
//call the overloaded propagate_tags to do the dirty work
- this->block_ptr->propagate_tags(i, TagIter(tags_i.begin(), tags_i.begin()+last));
+ data->block->propagate_tags(i, TagIter(tags_i.begin(), tags_i.begin()+last));
//now its safe to perform the erasure
tags_i.erase(tags_i.begin(), tags_i.begin()+last);
- this->stats.tags_consumed[i] += last;
+ data->stats.tags_consumed[i] += last;
}
GRAS_FORCE_INLINE void BlockActor::trim_msgs(const size_t i)
{
- const size_t num_read = this->num_input_msgs_read[i];
+ const size_t num_read = data->num_input_msgs_read[i];
if GRAS_UNLIKELY(num_read > 0)
{
- std::vector<PMCC> &input_msgs = this->input_msgs[i];
+ std::vector<PMCC> &input_msgs = data->input_msgs[i];
input_msgs.erase(input_msgs.begin(), input_msgs.begin()+num_read);
}
}
diff --git a/lib/task_done.cpp b/lib/task_done.cpp
index d116202..999fd2c 100644
--- a/lib/task_done.cpp
+++ b/lib/task_done.cpp
@@ -7,62 +7,62 @@ using namespace gras;
void Block::mark_done(void)
{
- (*this)->block->mark_done();
+ (*this)->block_actor->mark_done();
}
void BlockActor::mark_done(void)
{
- if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first
+ if (data->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first
- this->stats.stop_time = time_now();
- this->block_ptr->notify_inactive();
+ data->stats.stop_time = time_now();
+ data->block->notify_inactive();
//flush partial output buffers to the downstream
- for (size_t i = 0; i < this->get_num_outputs(); i++)
+ for (size_t i = 0; i < worker->get_num_outputs(); i++)
{
- if (not this->output_queues.ready(i)) continue;
- SBuffer &buff = this->output_queues.front(i);
+ if (not data->output_queues.ready(i)) continue;
+ SBuffer &buff = data->output_queues.front(i);
if (buff.length == 0) continue;
InputBufferMessage buff_msg;
buff_msg.buffer = buff;
- this->post_downstream(i, buff_msg);
- this->output_queues.pop(i);
+ worker->post_downstream(i, buff_msg);
+ data->output_queues.pop(i);
}
- this->interruptible_thread.reset();
+ data->interruptible_thread.reset();
//mark down the new state
- this->block_state = BLOCK_STATE_DONE;
+ data->block_state = BLOCK_STATE_DONE;
//release upstream, downstream, and executor tokens
- this->token_pool.clear();
+ data->token_pool.clear();
//release all buffers in queues
- this->input_queues.flush_all();
- this->output_queues.flush_all();
+ data->input_queues.flush_all();
+ data->output_queues.flush_all();
//release all tags and msgs
- for (size_t i = 0; i < this->get_num_inputs(); i++)
+ for (size_t i = 0; i < worker->get_num_inputs(); i++)
{
- this->input_msgs[i].clear();
- this->input_tags[i].clear();
+ data->input_msgs[i].clear();
+ data->input_tags[i].clear();
}
//tell the upstream and downstram to re-check their tokens
//this is how the other blocks know who is interested,
//and can decide based on interest to set done or not
- for (size_t i = 0; i < this->get_num_inputs(); i++)
+ for (size_t i = 0; i < worker->get_num_inputs(); i++)
{
- this->post_upstream(i, OutputCheckMessage());
+ worker->post_upstream(i, OutputCheckMessage());
}
- for (size_t i = 0; i < this->get_num_outputs(); i++)
+ for (size_t i = 0; i < worker->get_num_outputs(); i++)
{
- this->post_downstream(i, InputCheckMessage());
+ worker->post_downstream(i, InputCheckMessage());
}
if (ARMAGEDDON) std::cerr
<< "==================================================\n"
- << "== The " << block_ptr->to_string() << " is done...\n"
+ << "== The " << name << " is done...\n"
<< "==================================================\n"
<< std::flush;
}
diff --git a/lib/task_fail.cpp b/lib/task_fail.cpp
index 720e2e1..415c13b 100644
--- a/lib/task_fail.cpp
+++ b/lib/task_fail.cpp
@@ -7,52 +7,52 @@ using namespace gras;
void Block::mark_output_fail(const size_t which_output)
{
- (*this)->block->output_fail(which_output);
+ (*this)->block_actor->output_fail(which_output);
}
void Block::mark_input_fail(const size_t which_input)
{
- (*this)->block->input_fail(which_input);
+ (*this)->block_actor->input_fail(which_input);
}
void BlockActor::input_fail(const size_t i)
{
//input failed, accumulate and try again
- if (not this->input_queues.is_accumulated(i))
+ if (not data->input_queues.is_accumulated(i))
{
- this->input_queues.accumulate(i);
+ data->input_queues.accumulate(i);
this->task_kicker();
return;
}
//otherwise check for done, else wait for more
- if (this->inputs_done[i])
+ if (data->inputs_done[i])
{
this->mark_done();
return;
}
//check that the input is not already maxed
- if (this->input_queues.is_front_maximal(i))
+ if (data->input_queues.is_front_maximal(i))
{
throw std::runtime_error("input_fail called on maximum_items buffer in " + name);
}
//mark fail: not ready until a new buffer appears
- this->input_queues.fail(i);
+ data->input_queues.fail(i);
}
void BlockActor::output_fail(const size_t i)
{
- SBuffer &buff = this->output_queues.front(i);
+ SBuffer &buff = data->output_queues.front(i);
//check that the input is not already maxed
- const size_t front_items = buff.length/this->output_configs[i].item_size;
- if (front_items >= this->output_configs[i].maximum_items)
+ const size_t front_items = buff.length/data->output_configs[i].item_size;
+ if (front_items >= data->output_configs[i].maximum_items)
{
throw std::runtime_error("output_fail called on maximum_items buffer in " + name);
}
//mark fail: not ready until a new buffer appears
- this->output_queues.fail(i);
+ data->output_queues.fail(i);
}
diff --git a/lib/task_main.cpp b/lib/task_main.cpp
index 01e38ce..cb3e0ee 100644
--- a/lib/task_main.cpp
+++ b/lib/task_main.cpp
@@ -7,7 +7,7 @@ using namespace gras;
void BlockActor::task_main(void)
{
- TimerAccumulate ta_prep(this->stats.total_time_prep);
+ TimerAccumulate ta_prep(data->stats.total_time_prep);
//------------------------------------------------------------------
//-- Decide if its possible to continue any processing:
@@ -16,40 +16,40 @@ void BlockActor::task_main(void)
//------------------------------------------------------------------
if GRAS_UNLIKELY(not this->is_work_allowed()) return;
- const size_t num_inputs = this->get_num_inputs();
- const size_t num_outputs = this->get_num_outputs();
+ const size_t num_inputs = worker->get_num_inputs();
+ const size_t num_outputs = worker->get_num_outputs();
//------------------------------------------------------------------
//-- initialize input buffers before work
//------------------------------------------------------------------
size_t output_inline_index = 0;
- this->input_items.min() = ~0;
- this->input_items.max() = 0;
+ data->input_items.min() = ~0;
+ data->input_items.max() = 0;
for (size_t i = 0; i < num_inputs; i++)
{
this->sort_tags(i);
- this->num_input_msgs_read[i] = 0;
+ data->num_input_msgs_read[i] = 0;
- ASSERT(this->input_queues.ready(i));
- const SBuffer &buff = this->input_queues.front(i);
+ ASSERT(data->input_queues.ready(i));
+ const SBuffer &buff = data->input_queues.front(i);
const void *mem = buff.get();
- size_t items = buff.length/this->input_configs[i].item_size;
+ size_t items = buff.length/data->input_configs[i].item_size;
- this->input_items.vec()[i] = mem;
- this->input_items[i].get() = mem;
- this->input_items[i].size() = items;
- this->input_items.min() = std::min(this->input_items.min(), items);
- this->input_items.max() = std::max(this->input_items.max(), items);
+ data->input_items.vec()[i] = mem;
+ data->input_items[i].get() = mem;
+ data->input_items[i].size() = items;
+ data->input_items.min() = std::min(data->input_items.min(), items);
+ data->input_items.max() = std::max(data->input_items.max(), items);
//inline dealings, how and when input buffers can be inlined into output buffers
//*
if GRAS_UNLIKELY(
buff.unique() and
- input_configs[i].inline_buffer and
+ data->input_configs[i].inline_buffer and
output_inline_index < num_outputs and
- buff.get_affinity() == this->buffer_affinity
+ buff.get_affinity() == data->buffer_affinity
){
- this->output_queues.set_inline(output_inline_index++, buff);
+ data->output_queues.set_inline(output_inline_index++, buff);
}
//*/
}
@@ -57,41 +57,41 @@ void BlockActor::task_main(void)
//------------------------------------------------------------------
//-- initialize output buffers before work
//------------------------------------------------------------------
- this->output_items.min() = ~0;
- this->output_items.max() = 0;
+ data->output_items.min() = ~0;
+ data->output_items.max() = 0;
for (size_t i = 0; i < num_outputs; i++)
{
- ASSERT(this->output_queues.ready(i));
- SBuffer &buff = this->output_queues.front(i);
+ ASSERT(data->output_queues.ready(i));
+ SBuffer &buff = data->output_queues.front(i);
ASSERT(buff.length == 0); //assumes it was flushed last call
void *mem = buff.get();
const size_t bytes = buff.get_actual_length() - buff.offset;
- size_t items = bytes/this->output_configs[i].item_size;
+ size_t items = bytes/data->output_configs[i].item_size;
- this->output_items.vec()[i] = mem;
- this->output_items[i].get() = mem;
- this->output_items[i].size() = items;
- this->output_items.min() = std::min(this->output_items.min(), items);
- this->output_items.max() = std::max(this->output_items.max(), items);
+ data->output_items.vec()[i] = mem;
+ data->output_items[i].get() = mem;
+ data->output_items[i].size() = items;
+ data->output_items.min() = std::min(data->output_items.min(), items);
+ data->output_items.max() = std::max(data->output_items.max(), items);
}
//------------------------------------------------------------------
//-- the work
//------------------------------------------------------------------
ta_prep.done();
- this->stats.work_count++;
- if GRAS_UNLIKELY(this->interruptible_thread)
+ data->stats.work_count++;
+ if GRAS_UNLIKELY(data->interruptible_thread)
{
- TimerAccumulate ta_work(this->stats.total_time_work);
- this->interruptible_thread->call();
+ TimerAccumulate ta_work(data->stats.total_time_work);
+ data->interruptible_thread->call();
}
else
{
- TimerAccumulate ta_work(this->stats.total_time_work);
+ TimerAccumulate ta_work(data->stats.total_time_work);
this->task_work();
}
- this->stats.time_last_work = time_now();
- TimerAccumulate ta_post(this->stats.total_time_post);
+ data->stats.time_last_work = time_now();
+ TimerAccumulate ta_post(data->stats.total_time_post);
//------------------------------------------------------------------
//-- Post-work output tasks
@@ -99,18 +99,18 @@ void BlockActor::task_main(void)
for (size_t i = 0; i < num_outputs; i++)
{
//buffer may be popped by one of the special buffer api hooks
- if GRAS_UNLIKELY(this->output_queues.empty(i)) continue;
+ if GRAS_UNLIKELY(data->output_queues.empty(i)) continue;
//grab a copy of the front buffer then consume from the queue
InputBufferMessage buff_msg;
- buff_msg.buffer = this->output_queues.front(i);
- this->output_queues.consume(i);
+ buff_msg.buffer = data->output_queues.front(i);
+ data->output_queues.consume(i);
//Post a buffer message downstream only if the produce flag was marked.
//So this explicitly after consuming the output queues so pop is called.
//This is because pop may have special hooks in it to prepare the buffer.
- if GRAS_LIKELY(this->produce_outputs[i]) this->post_downstream(i, buff_msg);
- this->produce_outputs[i] = false;
+ if GRAS_LIKELY(data->produce_outputs[i]) worker->post_downstream(i, buff_msg);
+ data->produce_outputs[i] = false;
}
//------------------------------------------------------------------
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index 2106a54..12055f8 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -20,7 +20,7 @@ TopBlock::TopBlock(void)
TopBlock::TopBlock(const std::string &name):
HierBlock(name)
{
- (*this)->executor = boost::shared_ptr<Apology::Executor>(new Apology::Executor((*this)->topology.get()));
+ (*this)->executor.reset(new Apology::Executor((*this)->topology.get()));
(*this)->token = Token::make();
(*this)->thread_group = SharedThreadGroup(new boost::thread_group());
if (GENESIS) std::cerr
diff --git a/lib/top_block_query.cpp b/lib/top_block_query.cpp
index c38fbd9..79dadfb 100644
--- a/lib/top_block_query.cpp
+++ b/lib/top_block_query.cpp
@@ -32,12 +32,12 @@ static ptree query_blocks(ElementImpl *self, const ptree &)
{
ptree root;
ptree e;
- BOOST_FOREACH(Apology::Worker *worker, self->executor->get_workers())
+ BOOST_FOREACH(Apology::Worker *w, self->executor->get_workers())
{
- BlockActor *block = dynamic_cast<BlockActor *>(worker);
+ BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor());
ptree prop_e;
typedef std::pair<std::string, PropertyRegistryPair> PropRegistryKVP;
- BOOST_FOREACH(const PropRegistryKVP &p, block->property_registry)
+ BOOST_FOREACH(const PropRegistryKVP &p, actor->data->property_registry)
{
ptree prop_attrs;
if (p.second.setter)
@@ -56,7 +56,7 @@ static ptree query_blocks(ElementImpl *self, const ptree &)
block_attrs.push_back(std::make_pair(p.first, prop_attrs));
prop_e.push_back(std::make_pair("props", block_attrs));
}
- e.push_back(std::make_pair(block->block_ptr->get_uid(), prop_e));
+ e.push_back(std::make_pair(actor->data->block->get_uid(), prop_e));
}
root.push_back(std::make_pair("blocks", e));
return root;
@@ -77,16 +77,18 @@ static ptree query_stats(ElementImpl *self, const ptree &query)
//get stats with custom receiver and set high prio
GetStatsReceiver receiver;
size_t outstandingCount(0);
- BOOST_FOREACH(Apology::Worker *worker, self->executor->get_workers())
+ BOOST_FOREACH(Apology::Worker *w, self->executor->get_workers())
{
+ BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor());
+
//filter workers not needed in query
- const std::string id = dynamic_cast<BlockActor *>(worker)->block_ptr->get_uid();
+ const std::string id = actor->data->block->get_uid();
if (std::find(block_ids.begin(), block_ids.end(), id) == block_ids.end()) continue;
//send a message to the block's actor to query stats
GetStatsMessage message;
- message.prio_token = dynamic_cast<BlockActor *>(worker)->prio_token;
- worker->GetFramework().Send(message, receiver.GetAddress(), worker->GetAddress());
+ message.prio_token = actor->prio_token;
+ actor->GetFramework().Send(message, receiver.GetAddress(), actor->GetAddress());
outstandingCount++;
}
while (outstandingCount) outstandingCount -= receiver.Wait(outstandingCount);
@@ -169,19 +171,19 @@ static ptree query_props(ElementImpl *self, const ptree &query)
const std::string block_id = query.get<std::string>("block");
const std::string prop_key = query.get<std::string>("key");
const bool set = query.count("value") != 0;
- BOOST_FOREACH(Apology::Worker *worker, self->executor->get_workers())
+ BOOST_FOREACH(Apology::Worker *w, self->executor->get_workers())
{
- BlockActor *block = dynamic_cast<BlockActor *>(worker);
- if (block->block_ptr->get_uid() != block_id) continue;
+ BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor());
+ if (actor->data->block->get_uid() != block_id) continue;
if (set)
{
- const std::type_info &t = block->property_registry[prop_key].setter->type();
+ const std::type_info &t = actor->data->property_registry[prop_key].setter->type();
const PMCC p = ptree_to_pmc(query.get_child("value"), t);
- block->prop_access_dispatcher(prop_key, p, true);
+ actor->prop_access_dispatcher(prop_key, p, true);
}
else
{
- PMCC p = block->prop_access_dispatcher(prop_key, PMC(), false);
+ PMCC p = actor->prop_access_dispatcher(prop_key, PMC(), false);
ptree v = pmc_to_ptree(p);
root.push_back(std::make_pair("block", query.get_child("block")));
root.push_back(std::make_pair("key", query.get_child("key")));
diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp
index 38ea138..1586f9d 100644
--- a/lib/topology_handler.cpp
+++ b/lib/topology_handler.cpp
@@ -24,49 +24,49 @@ void BlockActor::handle_topology(
){
MESSAGE_TRACER();
- const size_t num_inputs = this->get_num_inputs();
- const size_t num_outputs = this->get_num_outputs();
+ const size_t num_inputs = worker->get_num_inputs();
+ const size_t num_outputs = worker->get_num_outputs();
//call notify_topology on block before committing settings
- this->block_ptr->notify_topology(num_inputs, num_outputs);
+ data->block->notify_topology(num_inputs, num_outputs);
//resize and fill port properties
- resize_fill_back(this->input_configs, num_inputs);
- resize_fill_back(this->output_configs, num_outputs);
+ resize_fill_back(data->input_configs, num_inputs);
+ resize_fill_back(data->output_configs, num_outputs);
//resize the bytes consumed/produced
- resize_fill_grow(this->stats.items_consumed, num_inputs, 0);
- resize_fill_grow(this->stats.tags_consumed, num_inputs, 0);
- resize_fill_grow(this->stats.msgs_consumed, num_inputs, 0);
- resize_fill_grow(this->stats.items_produced, num_outputs, 0);
- resize_fill_grow(this->stats.tags_produced, num_outputs, 0);
- resize_fill_grow(this->stats.msgs_produced, num_outputs, 0);
+ resize_fill_grow(data->stats.items_consumed, num_inputs, 0);
+ resize_fill_grow(data->stats.tags_consumed, num_inputs, 0);
+ resize_fill_grow(data->stats.msgs_consumed, num_inputs, 0);
+ resize_fill_grow(data->stats.items_produced, num_outputs, 0);
+ resize_fill_grow(data->stats.tags_produced, num_outputs, 0);
+ resize_fill_grow(data->stats.msgs_produced, num_outputs, 0);
//resize all work buffers to match current connections
- this->input_items.resize(num_inputs);
- this->output_items.resize(num_outputs);
- this->input_queues.resize(num_inputs);
- this->output_queues.resize(num_outputs);
- this->produce_outputs.resize(num_outputs, false);
- this->inputs_available.resize(num_inputs);
- if (num_inputs == 0) this->inputs_available.resize(1, true); //so its always "available"
+ data->input_items.resize(num_inputs);
+ data->output_items.resize(num_outputs);
+ data->input_queues.resize(num_inputs);
+ data->output_queues.resize(num_outputs);
+ data->produce_outputs.resize(num_outputs, false);
+ data->inputs_available.resize(num_inputs);
+ if (num_inputs == 0) data->inputs_available.resize(1, true); //so its always "available"
//copy the name into the queues for debug purposes
- this->input_queues.name = this->name;
- this->output_queues.name = this->name;
+ data->input_queues.name = this->name;
+ data->output_queues.name = this->name;
//resize the token trackers
- this->input_tokens.resize(num_inputs);
- this->output_tokens.resize(num_outputs);
- this->inputs_done.resize(num_inputs);
- this->outputs_done.resize(num_outputs);
- this->output_allocation_hints.resize(num_outputs);
+ data->input_tokens.resize(num_inputs);
+ data->output_tokens.resize(num_outputs);
+ data->inputs_done.resize(num_inputs);
+ data->outputs_done.resize(num_outputs);
+ data->output_allocation_hints.resize(num_outputs);
//resize tags vector to match sizes
- this->input_tags_changed.resize(num_inputs);
- this->input_tags.resize(num_inputs);
- this->num_input_msgs_read.resize(num_inputs);
- this->input_msgs.resize(num_inputs);
+ data->input_tags_changed.resize(num_inputs);
+ data->input_tags.resize(num_inputs);
+ data->num_input_msgs_read.resize(num_inputs);
+ data->input_msgs.resize(num_inputs);
//a block looses all connections, allow it to free
if (num_inputs == 0 and num_outputs == 0)
diff --git a/tests/thread_pool_test.py b/tests/thread_pool_test.py
index 0f8b513..2765b07 100644
--- a/tests/thread_pool_test.py
+++ b/tests/thread_pool_test.py
@@ -2,6 +2,7 @@
import unittest
import gras
+from demo_blocks import *
class ThreadPoolTest(unittest.TestCase):
@@ -23,5 +24,19 @@ class ThreadPoolTest(unittest.TestCase):
#here we assume prio 0.0 (default) can always be set
self.assertTrue(gras.ThreadPool.test_thread_priority(0.0))
+ def test_migrate_to_thread_pool(self):
+ tb = gras.TopBlock()
+ vec_source = VectorSource(numpy.uint32, [0, 9, 8, 7, 6])
+ vec_sink = VectorSink(numpy.uint32)
+
+ c = gras.ThreadPoolConfig()
+ tp = gras.ThreadPool(c)
+
+ vec_source.set_thread_pool(tp)
+ tb.connect(vec_source, vec_sink)
+ tb.run()
+
+ self.assertEqual(vec_sink.get_vector(), (0, 9, 8, 7, 6))
+
if __name__ == '__main__':
unittest.main()