summaryrefslogtreecommitdiff
path: root/lib/block_task.cpp
diff options
context:
space:
mode:
authorJosh Blum2012-08-31 01:18:45 -0700
committerJosh Blum2012-08-31 01:18:45 -0700
commite09df5e1170d4282d89e7b62cc75baa311c18da9 (patch)
tree3d08cc7cd233eb2da94acdaa517b761ddf93f1c2 /lib/block_task.cpp
parenta648f0970230203f05a434dba903e6a4a5a08d53 (diff)
downloadsandhi-e09df5e1170d4282d89e7b62cc75baa311c18da9.tar.gz
sandhi-e09df5e1170d4282d89e7b62cc75baa311c18da9.tar.bz2
sandhi-e09df5e1170d4282d89e7b62cc75baa311c18da9.zip
done logic tweaks, comments, cleanup
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r--lib/block_task.cpp77
1 files changed, 44 insertions, 33 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 3ec9113..61ed5dc 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -34,8 +34,22 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
this->output_buffer_tokens.clear();
//release all buffers in queues
- this->input_queues.clear();
- this->output_queues.clear();
+ for (size_t i = 0; i < task_iface.get_num_inputs(); i++)
+ {
+ while (not this->input_queues[i].empty())
+ {
+ this->input_queues[i].pop();
+ }
+ this->inputs_ready.set(i, false);
+ }
+ for (size_t i = 0; i < task_iface.get_num_outputs(); i++)
+ {
+ while (not this->output_queues[i].empty())
+ {
+ this->output_queues[i].pop();
+ }
+ this->outputs_ready.set(i, false);
+ }
//tell the upstream and downstram to re-check their tokens
//this is how the other blocks know who is interested,
@@ -64,6 +78,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//-- however, not all ports may have available buffers.
//------------------------------------------------------------------
if (not (this->block_state == BLOCK_STATE_LIVE and this->all_io_ready())) return;
+ //std::cout << "calling work on " << name << std::endl;
const size_t num_inputs = task_iface.get_num_inputs();
const size_t num_outputs = task_iface.get_num_outputs();
@@ -81,12 +96,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
}
//------------------------------------------------------------------
- //-- Processing time!
+ //-- initialize input buffers before work
//------------------------------------------------------------------
-
- std::cout << "calling work on " << name << std::endl;
-
- //reset work trackers for production/consumption
size_t input_tokens_count = 0;
for (size_t i = 0; i < num_inputs; i++)
{
@@ -109,6 +120,9 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->work_ninput_items[i] = items;
}
+ //------------------------------------------------------------------
+ //-- initialize output buffers before work
+ //------------------------------------------------------------------
size_t num_output_items = ~0; //so big that it must std::min
size_t output_tokens_count = 0;
for (size_t i = 0; i < num_outputs; i++)
@@ -129,27 +143,27 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
num_output_items = std::min(num_output_items, items);
}
- //someone upstream or downstream holds no tokens, we are done!
- if (
- (num_inputs != 0 and input_tokens_count == num_inputs) or
- (num_outputs != 0 and output_tokens_count == num_outputs)
- ){
+ //if we have outputs and at least one port has no downstream subscibers, mark done
+ if ((num_outputs != 0 and output_tokens_count == num_outputs)){
this->mark_done(task_iface);
return;
}
- //start with source, this should be EZ
- int ret = 0;
- ret = block_ptr->Work(this->input_items, this->output_items);
- //VAR(ret);
+ //------------------------------------------------------------------
+ //-- forecast (TODO) and work
+ //------------------------------------------------------------------
+ const int ret = block_ptr->Work(this->input_items, this->output_items);
+ const size_t noutput_items = size_t(ret);
+
if (ret == Block::WORK_DONE)
{
this->mark_done(task_iface);
return;
}
- const size_t noutput_items = size_t(ret);
- //now to deal with consumption and production
+ //------------------------------------------------------------------
+ //-- process input consumption
+ //------------------------------------------------------------------
for (size_t i = 0; i < num_inputs; i++)
{
ASSERT(enable_fixed_rate or ret != Block::WORK_CALLED_PRODUCE);
@@ -168,6 +182,10 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->input_buff_offsets[i] = 0;
}
}
+
+ //------------------------------------------------------------------
+ //-- process output production
+ //------------------------------------------------------------------
for (size_t i = 0; i < num_outputs; i++)
{
const size_t items = (ret == Block::WORK_CALLED_PRODUCE)? this->produce_items[i] : noutput_items;
@@ -182,19 +200,6 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->outputs_ready.set(i, not this->output_queues[i].empty());
}
- //0) figure out what we have for input data
- //1) calculate the possible num output items
- //2) take into account the item multiple
- //3) allocate some buffers
- //4) work....
-
- //block_ptr->forecast(100
-
-
- //TODO set deactive when work returns DONE
-
- //TODO source blocks should call work again until exhausted output buffers
-
//------------------------------------------------------------------
//-- trim the input tags that are past the consumption zone
//-- and post trimmed tags to the downstream based on policy
@@ -253,7 +258,13 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->output_tags[i].clear();
}
- //create a self-kick so we get called again
- //TODO, we could just steal this thread context...
+ //if there are inputs, and not all are provided for, and we have an empty queue, mark done
+ if ((num_inputs != 0 and input_tokens_count == num_inputs and not (~this->inputs_ready).none()))
+ {
+ this->mark_done(task_iface);
+ return;
+ }
+
+ //still have IO ready? kick off another task
if (this->all_io_ready()) this->block.post_msg(SelfKickMessage());
}