summaryrefslogtreecommitdiff
path: root/lib/block_task.cpp
diff options
context:
space:
mode:
authorJosh Blum2012-08-28 23:05:41 -0700
committerJosh Blum2012-08-28 23:05:41 -0700
commitac3857575c4c762f9a18ee18889740d4360a9aa8 (patch)
tree4526f5647f2e2d93c21d12ae3c524fb7991745b3 /lib/block_task.cpp
parent4044977deba6d64124763836d875b4da2b70eeaf (diff)
downloadsandhi-ac3857575c4c762f9a18ee18889740d4360a9aa8.tar.gz
sandhi-ac3857575c4c762f9a18ee18889740d4360a9aa8.tar.bz2
sandhi-ac3857575c4c762f9a18ee18889740d4360a9aa8.zip
token work w/ messages to implement finite runs
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r--lib/block_task.cpp52
1 files changed, 51 insertions, 1 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index f156ae0..577fdb7 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -20,8 +20,44 @@
using namespace gnuradio;
+void ElementImpl::free_inputs(const tsbe::TaskInterface &task_iface)
+{
+ for (size_t i = 0; i < task_iface.get_num_inputs(); i++)
+ {
+ while (task_iface.get_input_buffer(i))
+ {
+ task_iface.pop_input_buffer(i);
+ }
+ }
+}
+
+void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
+{
+ if (not this->active) return;
+ this->active = false;
+ this->token_pool.clear();
+ this->token.reset();
+ this->free_inputs(task_iface);
+ for (size_t i = 0; i < task_iface.get_num_inputs(); i++)
+ {
+ task_iface.post_upstream(i, CheckTokensMessage());
+ }
+ for (size_t i = 0; i < task_iface.get_num_outputs(); i++)
+ {
+ task_iface.post_downstream(i, CheckTokensMessage());
+ }
+ HERE();
+ VAR(name);
+}
+
void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
{
+ //FIXME in case we get called in the inactive state, assuming done?
+ if (not this->active)
+ {
+ this->free_inputs(task_iface);
+ }
+
//------------------------------------------------------------------
//-- Decide if its possible to continue any processing:
//-- Handle task may get called for incoming buffers,
@@ -53,8 +89,10 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
std::cout << "calling work on " << name << std::endl;
//reset work trackers for production/consumption
+ size_t input_tokens_count = 0;
for (size_t i = 0; i < num_inputs; i++)
{
+ input_tokens_count += this->input_tokens[i].use_count();
//this->consume_items[i] = 0;
ASSERT(this->input_history_items[i] == 0);
@@ -69,9 +107,12 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->work_input_items[i] = mem;
this->work_ninput_items[i] = items;
}
+
size_t num_output_items = ~0; //so big that it must std::min
+ size_t output_tokens_count = 0;
for (size_t i = 0; i < num_outputs; i++)
{
+ output_tokens_count += this->output_tokens[i].use_count();
//this->produce_items[i] = 0;
const tsbe::Buffer &buff = task_iface.get_output_buffer(i);
@@ -85,13 +126,22 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
num_output_items = std::min(num_output_items, items);
}
+ //someone upstream or downstream holds no tokens, we are done!
+ if (
+ (num_inputs != 0 and input_tokens_count == num_inputs) or
+ (num_outputs != 0 and output_tokens_count == num_outputs)
+ ){
+ this->mark_done(task_iface);
+ return;
+ }
+
//start with source, this should be EZ
int ret = 0;
ret = block_ptr->Work(this->input_items, this->output_items);
VAR(ret);
if (ret == Block::WORK_DONE)
{
- this->active = false;
+ this->mark_done(task_iface);
return;
}
const size_t noutput_items = size_t(ret);