summaryrefslogtreecommitdiff
path: root/lib/block_task.cpp
diff options
context:
space:
mode:
authorJosh Blum2012-08-31 01:57:44 -0700
committerJosh Blum2012-08-31 01:57:44 -0700
commitce4dfa1aad38cc2f4336e754d8feb3b245da5c3f (patch)
treee4e68d537c069f6f0f16afe87d1870e5b84e1615 /lib/block_task.cpp
parente09df5e1170d4282d89e7b62cc75baa311c18da9 (diff)
downloadsandhi-ce4dfa1aad38cc2f4336e754d8feb3b245da5c3f.tar.gz
sandhi-ce4dfa1aad38cc2f4336e754d8feb3b245da5c3f.tar.bz2
sandhi-ce4dfa1aad38cc2f4336e754d8feb3b245da5c3f.zip
created vector of queues to simplify repeated logic
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r--lib/block_task.cpp49
1 files changed, 20 insertions, 29 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 61ed5dc..6fb9c54 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -34,22 +34,8 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
this->output_buffer_tokens.clear();
//release all buffers in queues
- for (size_t i = 0; i < task_iface.get_num_inputs(); i++)
- {
- while (not this->input_queues[i].empty())
- {
- this->input_queues[i].pop();
- }
- this->inputs_ready.set(i, false);
- }
- for (size_t i = 0; i < task_iface.get_num_outputs(); i++)
- {
- while (not this->output_queues[i].empty())
- {
- this->output_queues[i].pop();
- }
- this->outputs_ready.set(i, false);
- }
+ this->input_queues.flush_all();
+ this->output_queues.flush_all();
//tell the upstream and downstram to re-check their tokens
//this is how the other blocks know who is interested,
@@ -77,7 +63,11 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//-- Handle task may get called for incoming buffers,
//-- however, not all ports may have available buffers.
//------------------------------------------------------------------
- if (not (this->block_state == BLOCK_STATE_LIVE and this->all_io_ready())) return;
+ if (not(
+ this->block_state == BLOCK_STATE_LIVE and
+ this->input_queues.all_ready() and
+ this->output_queues.all_ready()
+ )) return;
//std::cout << "calling work on " << name << std::endl;
const size_t num_inputs = task_iface.get_num_inputs();
@@ -105,9 +95,9 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//this->consume_items[i] = 0;
ASSERT(this->input_history_items[i] == 0);
- ASSERT(not this->input_queues[i].empty());
+ ASSERT(this->input_queues.ready(i));
- const tsbe::Buffer &buff = this->input_queues[i].front();
+ const tsbe::Buffer &buff = this->input_queues.front(i);
char *mem = ((char *)buff.get_memory()) + this->input_buff_offsets[i];
const size_t bytes = buff.get_length() - this->input_buff_offsets[i];
const size_t items = bytes/this->input_items_sizes[i];
@@ -130,9 +120,9 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
output_tokens_count += this->output_tokens[i].use_count();
//this->produce_items[i] = 0;
- ASSERT(not this->output_queues[i].empty());
+ ASSERT(this->output_queues.ready(i));
- const tsbe::Buffer &buff = this->output_queues[i].front();
+ const tsbe::Buffer &buff = this->output_queues.front(i);
char *mem = ((char *)buff.get_memory());
const size_t bytes = buff.get_length();
const size_t items = bytes/this->output_items_sizes[i];
@@ -173,12 +163,11 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->items_consumed[i] += items;
const size_t bytes = items*this->input_items_sizes[i];
this->input_buff_offsets[i] += bytes;
- tsbe::Buffer &buff = this->input_queues[i].front();
+ tsbe::Buffer &buff = this->input_queues.front(i);
if (buff.get_length() <= this->input_buff_offsets[i])
{
- this->input_queues[i].pop();
- this->inputs_ready.set(i, not this->input_queues[i].empty());
+ this->input_queues.pop(i);
this->input_buff_offsets[i] = 0;
}
}
@@ -193,11 +182,10 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->items_produced[i] += items;
const size_t bytes = items*this->output_items_sizes[i];
- tsbe::Buffer &buff = this->output_queues[i].front();
+ tsbe::Buffer &buff = this->output_queues.front(i);
buff.get_length() = bytes;
task_iface.post_downstream(i, buff);
- this->output_queues[i].pop();
- this->outputs_ready.set(i, not this->output_queues[i].empty());
+ this->output_queues.pop(i);
}
//------------------------------------------------------------------
@@ -259,12 +247,15 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
}
//if there are inputs, and not all are provided for, and we have an empty queue, mark done
- if ((num_inputs != 0 and input_tokens_count == num_inputs and not (~this->inputs_ready).none()))
+ if (num_inputs != 0 and input_tokens_count == num_inputs and not this->input_queues.all_ready())
{
this->mark_done(task_iface);
return;
}
//still have IO ready? kick off another task
- if (this->all_io_ready()) this->block.post_msg(SelfKickMessage());
+ if (this->input_queues.all_ready() and this->output_queues.all_ready())
+ {
+ this->block.post_msg(SelfKickMessage());
+ }
}