summaryrefslogtreecommitdiff
path: root/lib/block_task.cpp
diff options
context:
space:
mode:
authorJosh Blum2012-08-30 20:22:44 -0700
committerJosh Blum2012-08-30 20:22:44 -0700
commita648f0970230203f05a434dba903e6a4a5a08d53 (patch)
tree9e48ccac26a85dec161f2d8806bf5c1cf650e016 /lib/block_task.cpp
parent44fb4c04d2ecda3f60fc4f7e31da64cce9ca7a6d (diff)
downloadsandhi-a648f0970230203f05a434dba903e6a4a5a08d53.tar.gz
sandhi-a648f0970230203f05a434dba903e6a4a5a08d53.tar.bz2
sandhi-a648f0970230203f05a434dba903e6a4a5a08d53.zip
checking in small fixes and message work
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r--lib/block_task.cpp31
1 files changed, 21 insertions, 10 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index faf8098..3ec9113 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -22,11 +22,10 @@ using namespace gnuradio;
void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
{
- if (this->done) return; //can re-enter checking done first
+ if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first
//mark down the new state
- this->active = false;
- this->done = true;
+ this->block_state = BLOCK_STATE_DONE;
//release upstream, downstream, and executor tokens
this->token_pool.clear();
@@ -50,7 +49,11 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
task_iface.post_downstream(i, CheckTokensMessage());
}
- std::cout << "This one: " << name << " is done..." << std::endl;
+ std::cout
+ << "==================================================\n"
+ << "== The " << name << " is done...\n"
+ << "==================================================\n"
+ << std::flush;
}
void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
@@ -60,13 +63,11 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//-- Handle task may get called for incoming buffers,
//-- however, not all ports may have available buffers.
//------------------------------------------------------------------
- const bool all_inputs_ready = (~this->inputs_ready).none();
- const bool all_outputs_ready = (~this->outputs_ready).none();
- if (not (this->active and all_inputs_ready and all_outputs_ready)) return;
+ if (not (this->block_state == BLOCK_STATE_LIVE and this->all_io_ready())) return;
const size_t num_inputs = task_iface.get_num_inputs();
const size_t num_outputs = task_iface.get_num_outputs();
- const bool is_source = (num_inputs == 0);
+ //const bool is_source = (num_inputs == 0);
//------------------------------------------------------------------
//-- sort the input tags before working
@@ -83,7 +84,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//-- Processing time!
//------------------------------------------------------------------
- //std::cout << "calling work on " << name << std::endl;
+ std::cout << "calling work on " << name << std::endl;
//reset work trackers for production/consumption
size_t input_tokens_count = 0;
@@ -93,12 +94,15 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//this->consume_items[i] = 0;
ASSERT(this->input_history_items[i] == 0);
+ ASSERT(not this->input_queues[i].empty());
const tsbe::Buffer &buff = this->input_queues[i].front();
char *mem = ((char *)buff.get_memory()) + this->input_buff_offsets[i];
const size_t bytes = buff.get_length() - this->input_buff_offsets[i];
const size_t items = bytes/this->input_items_sizes[i];
+ ASSERT(this->input_buff_offsets[i] < buff.get_length());
+
this->input_items[i]._mem = mem;
this->input_items[i]._len = items;
this->work_input_items[i] = mem;
@@ -112,6 +116,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
output_tokens_count += this->output_tokens[i].use_count();
//this->produce_items[i] = 0;
+ ASSERT(not this->output_queues[i].empty());
+
const tsbe::Buffer &buff = this->output_queues[i].front();
char *mem = ((char *)buff.get_memory());
const size_t bytes = buff.get_length();
@@ -154,7 +160,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
const size_t bytes = items*this->input_items_sizes[i];
this->input_buff_offsets[i] += bytes;
tsbe::Buffer &buff = this->input_queues[i].front();
- if (buff.get_length() >= this->input_buff_offsets[i])
+
+ if (buff.get_length() <= this->input_buff_offsets[i])
{
this->input_queues[i].pop();
this->inputs_ready.set(i, not this->input_queues[i].empty());
@@ -245,4 +252,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
}
this->output_tags[i].clear();
}
+
+ //create a self-kick so we get called again
+ //TODO, we could just steal this thread context...
+ if (this->all_io_ready()) this->block.post_msg(SelfKickMessage());
}