diff options
author | Josh Blum | 2012-08-31 01:57:44 -0700 |
---|---|---|
committer | Josh Blum | 2012-08-31 01:57:44 -0700 |
commit | ce4dfa1aad38cc2f4336e754d8feb3b245da5c3f (patch) | |
tree | e4e68d537c069f6f0f16afe87d1870e5b84e1615 /lib/block_task.cpp | |
parent | e09df5e1170d4282d89e7b62cc75baa311c18da9 (diff) | |
download | sandhi-ce4dfa1aad38cc2f4336e754d8feb3b245da5c3f.tar.gz sandhi-ce4dfa1aad38cc2f4336e754d8feb3b245da5c3f.tar.bz2 sandhi-ce4dfa1aad38cc2f4336e754d8feb3b245da5c3f.zip |
created vector of queues to simplify repeated logic
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r-- | lib/block_task.cpp | 49 |
1 files changed, 20 insertions, 29 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 61ed5dc..6fb9c54 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -34,22 +34,8 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface) this->output_buffer_tokens.clear(); //release all buffers in queues - for (size_t i = 0; i < task_iface.get_num_inputs(); i++) - { - while (not this->input_queues[i].empty()) - { - this->input_queues[i].pop(); - } - this->inputs_ready.set(i, false); - } - for (size_t i = 0; i < task_iface.get_num_outputs(); i++) - { - while (not this->output_queues[i].empty()) - { - this->output_queues[i].pop(); - } - this->outputs_ready.set(i, false); - } + this->input_queues.flush_all(); + this->output_queues.flush_all(); //tell the upstream and downstram to re-check their tokens //this is how the other blocks know who is interested, @@ -77,7 +63,11 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //-- Handle task may get called for incoming buffers, //-- however, not all ports may have available buffers. //------------------------------------------------------------------ - if (not (this->block_state == BLOCK_STATE_LIVE and this->all_io_ready())) return; + if (not( + this->block_state == BLOCK_STATE_LIVE and + this->input_queues.all_ready() and + this->output_queues.all_ready() + )) return; //std::cout << "calling work on " << name << std::endl; const size_t num_inputs = task_iface.get_num_inputs(); @@ -105,9 +95,9 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //this->consume_items[i] = 0; ASSERT(this->input_history_items[i] == 0); - ASSERT(not this->input_queues[i].empty()); + ASSERT(this->input_queues.ready(i)); - const tsbe::Buffer &buff = this->input_queues[i].front(); + const tsbe::Buffer &buff = this->input_queues.front(i); char *mem = ((char *)buff.get_memory()) + this->input_buff_offsets[i]; const size_t bytes = buff.get_length() - this->input_buff_offsets[i]; const size_t items = bytes/this->input_items_sizes[i]; @@ -130,9 +120,9 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) output_tokens_count += this->output_tokens[i].use_count(); //this->produce_items[i] = 0; - ASSERT(not this->output_queues[i].empty()); + ASSERT(this->output_queues.ready(i)); - const tsbe::Buffer &buff = this->output_queues[i].front(); + const tsbe::Buffer &buff = this->output_queues.front(i); char *mem = ((char *)buff.get_memory()); const size_t bytes = buff.get_length(); const size_t items = bytes/this->output_items_sizes[i]; @@ -173,12 +163,11 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->items_consumed[i] += items; const size_t bytes = items*this->input_items_sizes[i]; this->input_buff_offsets[i] += bytes; - tsbe::Buffer &buff = this->input_queues[i].front(); + tsbe::Buffer &buff = this->input_queues.front(i); if (buff.get_length() <= this->input_buff_offsets[i]) { - this->input_queues[i].pop(); - this->inputs_ready.set(i, not this->input_queues[i].empty()); + this->input_queues.pop(i); this->input_buff_offsets[i] = 0; } } @@ -193,11 +182,10 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->items_produced[i] += items; const size_t bytes = items*this->output_items_sizes[i]; - tsbe::Buffer &buff = this->output_queues[i].front(); + tsbe::Buffer &buff = this->output_queues.front(i); buff.get_length() = bytes; task_iface.post_downstream(i, buff); - this->output_queues[i].pop(); - this->outputs_ready.set(i, not this->output_queues[i].empty()); + this->output_queues.pop(i); } //------------------------------------------------------------------ @@ -259,12 +247,15 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) } //if there are inputs, and not all are provided for, and we have an empty queue, mark done - if ((num_inputs != 0 and input_tokens_count == num_inputs and not (~this->inputs_ready).none())) + if (num_inputs != 0 and input_tokens_count == num_inputs and not this->input_queues.all_ready()) { this->mark_done(task_iface); return; } //still have IO ready? kick off another task - if (this->all_io_ready()) this->block.post_msg(SelfKickMessage()); + if (this->input_queues.all_ready() and this->output_queues.all_ready()) + { + this->block.post_msg(SelfKickMessage()); + } } |