summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Blum2012-09-29 14:45:29 -0700
committerJosh Blum2012-09-29 14:45:29 -0700
commitec1677346389ab3b434d81c6bde15321f3dbe209 (patch)
treea4fd8498e64dd90f2fc169a9de747e49e2173830
parentb194049a9fb5ab60f15bfcca1a53e39a42339244 (diff)
downloadsandhi-ec1677346389ab3b434d81c6bde15321f3dbe209.tar.gz
sandhi-ec1677346389ab3b434d81c6bde15321f3dbe209.tar.bz2
sandhi-ec1677346389ab3b434d81c6bde15321f3dbe209.zip
create IO subscriber bitset for tracking done
-rw-r--r--lib/block.cpp1
-rw-r--r--lib/block_allocator.cpp6
-rw-r--r--lib/block_handlers.cpp29
-rw-r--r--lib/block_task.cpp33
-rw-r--r--lib/element.cpp2
-rw-r--r--lib/gras_impl/block_actor.hpp9
-rw-r--r--lib/gras_impl/messages.hpp5
-rw-r--r--lib/input_handlers.cpp9
-rw-r--r--lib/output_handlers.cpp3
-rw-r--r--lib/topology_handler.cpp2
10 files changed, 48 insertions, 51 deletions
diff --git a/lib/block.cpp b/lib/block.cpp
index 21165f2..40b220b 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -28,6 +28,7 @@ Block::Block(const std::string &name):
Element(name)
{
(*this)->block = boost::shared_ptr<BlockActor>(new BlockActor());
+ (*this)->block->name = name; //for debug purposes
//setup some state variables
(*this)->block->topology_init = false;
diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp
index b2da9ea..ad39159 100644
--- a/lib/block_allocator.cpp
+++ b/lib/block_allocator.cpp
@@ -74,6 +74,12 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address
{
MESSAGE_TRACER();
+ if (this->block_state == BLOCK_STATE_DONE)
+ {
+ this->Send(0, from); //ACK
+ return;
+ }
+
//allocate output buffers which will also wake up the task
const size_t num_outputs = this->get_num_outputs();
this->output_buffer_tokens.resize(num_outputs);
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index f386804..6925a60 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -27,6 +27,12 @@ void BlockActor::handle_top_active(
){
MESSAGE_TRACER();
+ if (this->block_state == BLOCK_STATE_DONE)
+ {
+ this->Send(0, from); //ACK
+ return;
+ }
+
if (this->block_state != BLOCK_STATE_LIVE)
{
this->block_ptr->start();
@@ -61,10 +67,17 @@ void BlockActor::handle_top_token(
){
MESSAGE_TRACER();
+ if (this->block_state == BLOCK_STATE_DONE)
+ {
+ this->Send(0, from); //ACK
+ return;
+ }
+
//create input tokens and send allocation hints
for (size_t i = 0; i < this->get_num_inputs(); i++)
{
this->input_tokens[i] = Token::make();
+ this->inputs_done.reset(i);
OutputTokenMessage token_msg;
token_msg.token = this->input_tokens[i];
this->post_upstream(i, token_msg);
@@ -83,6 +96,7 @@ void BlockActor::handle_top_token(
for (size_t i = 0; i < this->get_num_outputs(); i++)
{
this->output_tokens[i] = Token::make();
+ this->outputs_done.reset(i);
InputTokenMessage token_msg;
token_msg.token = this->output_tokens[i];
this->post_downstream(i, token_msg);
@@ -133,18 +147,3 @@ void BlockActor::handle_self_kick(
MESSAGE_TRACER();
this->handle_task();
}
-
-void BlockActor::handle_check_tokens(
- const CheckTokensMessage &,
- const Theron::Address
-){
- MESSAGE_TRACER();
- if (this->input_queues.all_ready() and not this->forecast_fail)
- {
- this->handle_task();
- }
- else
- {
- this->mark_done();
- }
-}
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 761e923..3f012da 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -90,22 +90,17 @@ void BlockActor::handle_task(void)
const size_t num_inputs = this->get_num_inputs();
const size_t num_outputs = this->get_num_outputs();
- //const bool is_source = (num_inputs == 0);
- //const bool is_sink = (num_outputs == 0);
this->work_io_ptr_mask = 0; //reset
//------------------------------------------------------------------
//-- initialize input buffers before work
//------------------------------------------------------------------
size_t num_input_items = REALLY_BIG; //so big that it must std::min
- bool inputs_done = false;
size_t output_inline_index = 0;
for (size_t i = 0; i < num_inputs; i++)
{
this->sort_tags(i);
- inputs_done = inputs_done or this->input_tokens[i].unique();
-
ASSERT(this->input_queues.ready(i));
bool potential_inline;
const SBuffer buff = this->input_queues.front(i, this->enable_fixed_rate, potential_inline);
@@ -141,11 +136,8 @@ void BlockActor::handle_task(void)
//-- initialize output buffers before work
//------------------------------------------------------------------
size_t num_output_items = REALLY_BIG; //so big that it must std::min
- bool outputs_done = false;
for (size_t i = 0; i < num_outputs; i++)
{
- outputs_done = outputs_done or this->output_tokens[i].unique();
-
ASSERT(this->output_queues.ready(i));
const SBuffer &buff = this->output_queues.front(i);
void *mem = buff.get(buff.length);
@@ -160,13 +152,6 @@ void BlockActor::handle_task(void)
this->produce_items[i] = 0;
}
- //if we have outputs and at least one port has no downstream subscibers, mark done
- if (outputs_done)
- {
- this->mark_done();
- return;
- }
-
//------------------------------------------------------------------
//-- forecast
//------------------------------------------------------------------
@@ -187,7 +172,7 @@ void BlockActor::handle_task(void)
if (num_output_items) goto forecast_again_you_jerk;
this->forecast_fail = true;
- this->conclusion(inputs_done);
+ this->conclusion();
return;
}
}
@@ -258,17 +243,19 @@ void BlockActor::handle_task(void)
//------------------------------------------------------------------
//-- Message self based on post-work conditions
//------------------------------------------------------------------
- this->conclusion(inputs_done);
+ this->conclusion();
}
-GRAS_FORCE_INLINE void BlockActor::conclusion(const bool inputs_done)
+GRAS_FORCE_INLINE void BlockActor::conclusion(void)
{
- //if there are inputs, and not all are provided for,
- //tell the block to check input queues and handle done
- if (inputs_done)
+
+ //since nothing else is coming in, its safe to mark done
+ if ((~this->inputs_done).none()) //no upstream providers
{
- this->Push(CheckTokensMessage(), Theron::Address());
- return;
+ if (not this->input_queues.all_ready() or this->forecast_fail)
+ {
+ this->mark_done();
+ }
}
//still have IO ready? kick off another task
diff --git a/lib/element.cpp b/lib/element.cpp
index a733d31..c7afb8b 100644
--- a/lib/element.cpp
+++ b/lib/element.cpp
@@ -34,7 +34,7 @@ Element::Element(const std::string &name)
(*this)->name = name;
(*this)->unique_id = ++unique_id_pool;
- if (GENESIS) std::cerr << "New element: " << name << " " << (*this)->unique_id << std::endl;
+ if (GENESIS) std::cerr << "New element: " << to_string() << std::endl;
//default io signature to something
IOSignature sig; sig.push_back(1);
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 8932ded..60c730f 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -27,6 +27,7 @@
#include <gras_impl/output_buffer_queues.hpp>
#include <gras_impl/input_buffer_queues.hpp>
#include <gras_impl/interruptible_thread.hpp>
+#include <boost/dynamic_bitset.hpp>
#include <vector>
#include <set>
@@ -48,6 +49,7 @@ struct BlockActor : Apology::Worker
BlockActor(void);
~BlockActor(void);
Block *block_ptr;
+ std::string name; //for debug
//do it here so we can match w/ the handler declarations
void register_handlers(void)
@@ -74,7 +76,6 @@ struct BlockActor : Apology::Worker
this->RegisterHandler(this, &BlockActor::handle_output_alloc);
this->RegisterHandler(this, &BlockActor::handle_self_kick);
- this->RegisterHandler(this, &BlockActor::handle_check_tokens);
this->RegisterHandler(this, &BlockActor::handle_update_inputs);
}
@@ -101,7 +102,6 @@ struct BlockActor : Apology::Worker
void handle_output_alloc(const OutputAllocMessage &, const Theron::Address);
void handle_self_kick(const SelfKickMessage &, const Theron::Address);
- void handle_check_tokens(const CheckTokensMessage &, const Theron::Address);
void handle_update_inputs(const UpdateInputsMessage &, const Theron::Address);
//helpers
@@ -110,7 +110,7 @@ struct BlockActor : Apology::Worker
void handle_task(void);
void sort_tags(const size_t index);
void trim_tags(const size_t index);
- void conclusion(const bool);
+ void conclusion(void);
//per port properties
std::vector<size_t> input_items_sizes;
@@ -144,8 +144,9 @@ struct BlockActor : Apology::Worker
//track the subscriber counts
std::vector<Token> input_tokens;
std::vector<Token> output_tokens;
+ boost::dynamic_bitset<> inputs_done;
+ boost::dynamic_bitset<> outputs_done;
std::set<Token> token_pool;
-
std::vector<SBufferToken> output_buffer_tokens;
//buffer queues and ready conditions
diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp
index 183befd..ed0ef81 100644
--- a/lib/gras_impl/messages.hpp
+++ b/lib/gras_impl/messages.hpp
@@ -137,11 +137,6 @@ struct SelfKickMessage
//empty
};
-struct CheckTokensMessage
-{
- //empty
-};
-
struct UpdateInputsMessage
{
//empty
diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp
index f01ad28..461600f 100644
--- a/lib/input_handlers.cpp
+++ b/lib/input_handlers.cpp
@@ -55,9 +55,14 @@ void BlockActor::handle_input_check(const InputCheckMessage &message, const Ther
const size_t index = message.index;
//an upstream block declared itself done, recheck the token
- if (this->input_queues.empty(index) and this->input_tokens[index].unique())
+ this->inputs_done.set(index, this->input_tokens[index].unique());
+
+ if ((~this->inputs_done).none()) //no upstream providers
{
- this->mark_done();
+ if (not this->input_queues.all_ready())
+ {
+ this->mark_done();
+ }
}
}
diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp
index bf63470..fa01b51 100644
--- a/lib/output_handlers.cpp
+++ b/lib/output_handlers.cpp
@@ -47,7 +47,8 @@ void BlockActor::handle_output_check(const OutputCheckMessage &message, const Th
const size_t index = message.index;
//a downstream block has declared itself done, recheck the token
- if (this->output_tokens[index].unique())
+ this->outputs_done.set(index, this->output_tokens[index].unique());
+ if ((~this->outputs_done).none()) //no downstream subscribers?
{
this->mark_done();
}
diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp
index 53c4961..8208fc6 100644
--- a/lib/topology_handler.cpp
+++ b/lib/topology_handler.cpp
@@ -60,6 +60,8 @@ void BlockActor::handle_topology(
this->input_tokens.resize(num_inputs);
this->output_tokens.resize(num_outputs);
+ this->inputs_done.resize(num_inputs);
+ this->outputs_done.resize(num_outputs);
this->output_allocation_hints.resize(num_outputs);
//resize tags vector to match sizes