diff options
author | Josh Blum | 2012-08-30 20:22:44 -0700 |
---|---|---|
committer | Josh Blum | 2012-08-30 20:22:44 -0700 |
commit | a648f0970230203f05a434dba903e6a4a5a08d53 (patch) | |
tree | 9e48ccac26a85dec161f2d8806bf5c1cf650e016 /lib/block_task.cpp | |
parent | 44fb4c04d2ecda3f60fc4f7e31da64cce9ca7a6d (diff) | |
download | sandhi-a648f0970230203f05a434dba903e6a4a5a08d53.tar.gz sandhi-a648f0970230203f05a434dba903e6a4a5a08d53.tar.bz2 sandhi-a648f0970230203f05a434dba903e6a4a5a08d53.zip |
checking in small fixes and message work
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r-- | lib/block_task.cpp | 31 |
1 files changed, 21 insertions, 10 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp index faf8098..3ec9113 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -22,11 +22,10 @@ using namespace gnuradio; void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface) { - if (this->done) return; //can re-enter checking done first + if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first //mark down the new state - this->active = false; - this->done = true; + this->block_state = BLOCK_STATE_DONE; //release upstream, downstream, and executor tokens this->token_pool.clear(); @@ -50,7 +49,11 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface) task_iface.post_downstream(i, CheckTokensMessage()); } - std::cout << "This one: " << name << " is done..." << std::endl; + std::cout + << "==================================================\n" + << "== The " << name << " is done...\n" + << "==================================================\n" + << std::flush; } void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) @@ -60,13 +63,11 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //-- Handle task may get called for incoming buffers, //-- however, not all ports may have available buffers. //------------------------------------------------------------------ - const bool all_inputs_ready = (~this->inputs_ready).none(); - const bool all_outputs_ready = (~this->outputs_ready).none(); - if (not (this->active and all_inputs_ready and all_outputs_ready)) return; + if (not (this->block_state == BLOCK_STATE_LIVE and this->all_io_ready())) return; const size_t num_inputs = task_iface.get_num_inputs(); const size_t num_outputs = task_iface.get_num_outputs(); - const bool is_source = (num_inputs == 0); + //const bool is_source = (num_inputs == 0); //------------------------------------------------------------------ //-- sort the input tags before working @@ -83,7 +84,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //-- Processing time! //------------------------------------------------------------------ - //std::cout << "calling work on " << name << std::endl; + std::cout << "calling work on " << name << std::endl; //reset work trackers for production/consumption size_t input_tokens_count = 0; @@ -93,12 +94,15 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //this->consume_items[i] = 0; ASSERT(this->input_history_items[i] == 0); + ASSERT(not this->input_queues[i].empty()); const tsbe::Buffer &buff = this->input_queues[i].front(); char *mem = ((char *)buff.get_memory()) + this->input_buff_offsets[i]; const size_t bytes = buff.get_length() - this->input_buff_offsets[i]; const size_t items = bytes/this->input_items_sizes[i]; + ASSERT(this->input_buff_offsets[i] < buff.get_length()); + this->input_items[i]._mem = mem; this->input_items[i]._len = items; this->work_input_items[i] = mem; @@ -112,6 +116,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) output_tokens_count += this->output_tokens[i].use_count(); //this->produce_items[i] = 0; + ASSERT(not this->output_queues[i].empty()); + const tsbe::Buffer &buff = this->output_queues[i].front(); char *mem = ((char *)buff.get_memory()); const size_t bytes = buff.get_length(); @@ -154,7 +160,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) const size_t bytes = items*this->input_items_sizes[i]; this->input_buff_offsets[i] += bytes; tsbe::Buffer &buff = this->input_queues[i].front(); - if (buff.get_length() >= this->input_buff_offsets[i]) + + if (buff.get_length() <= this->input_buff_offsets[i]) { this->input_queues[i].pop(); this->inputs_ready.set(i, not this->input_queues[i].empty()); @@ -245,4 +252,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) } this->output_tags[i].clear(); } + + //create a self-kick so we get called again + //TODO, we could just steal this thread context... + if (this->all_io_ready()) this->block.post_msg(SelfKickMessage()); } |