summaryrefslogtreecommitdiff
path: root/lib/block_task.cpp
diff options
context:
space:
mode:
authorJosh Blum2012-08-29 23:15:21 -0700
committerJosh Blum2012-08-29 23:15:21 -0700
commit36f216977ff79a72b3c5498162659050bc7552ad (patch)
tree78d13d782f86d895138aefa690b0f056093ece72 /lib/block_task.cpp
parent483a44a31f6e02ae0cadfc41710f052d9e48fc6c (diff)
downloadsandhi-36f216977ff79a72b3c5498162659050bc7552ad.tar.gz
sandhi-36f216977ff79a72b3c5498162659050bc7552ad.tar.bz2
sandhi-36f216977ff79a72b3c5498162659050bc7552ad.zip
using port messages and implement buffer queues in house
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r--lib/block_task.cpp24
1 files changed, 14 insertions, 10 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 577fdb7..d871f12 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -24,10 +24,11 @@ void ElementImpl::free_inputs(const tsbe::TaskInterface &task_iface)
{
for (size_t i = 0; i < task_iface.get_num_inputs(); i++)
{
- while (task_iface.get_input_buffer(i))
+ while (not this->input_queues[i].empty())
{
- task_iface.pop_input_buffer(i);
+ this->input_queues[i].pop();
}
+ this->inputs_ready.set(i, false);
}
}
@@ -38,6 +39,7 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
this->token_pool.clear();
this->token.reset();
this->free_inputs(task_iface);
+ this->output_buffer_tokens.clear();
for (size_t i = 0; i < task_iface.get_num_inputs(); i++)
{
task_iface.post_upstream(i, CheckTokensMessage());
@@ -63,8 +65,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//-- Handle task may get called for incoming buffers,
//-- however, not all ports may have available buffers.
//------------------------------------------------------------------
- const bool all_inputs_ready = (~task_iface.get_inputs_ready()).none();
- const bool all_outputs_ready = (~task_iface.get_outputs_ready()).none();
+ const bool all_inputs_ready = (~this->inputs_ready).none();
+ const bool all_outputs_ready = (~this->outputs_ready).none();
if (not (this->active and all_inputs_ready and all_outputs_ready)) return;
const size_t num_inputs = task_iface.get_num_inputs();
@@ -97,7 +99,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
ASSERT(this->input_history_items[i] == 0);
- const tsbe::Buffer &buff = task_iface.get_input_buffer(i);
+ const tsbe::Buffer &buff = this->input_queues[i].front();
char *mem = ((char *)buff.get_memory()) + this->input_buff_offsets[i];
const size_t bytes = buff.get_length() - this->input_buff_offsets[i];
const size_t items = bytes/this->input_items_sizes[i];
@@ -115,7 +117,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
output_tokens_count += this->output_tokens[i].use_count();
//this->produce_items[i] = 0;
- const tsbe::Buffer &buff = task_iface.get_output_buffer(i);
+ const tsbe::Buffer &buff = this->output_queues[i].front();
char *mem = ((char *)buff.get_memory());
const size_t bytes = buff.get_length();
const size_t items = bytes/this->output_items_sizes[i];
@@ -156,10 +158,11 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->items_consumed[i] += items;
const size_t bytes = items*this->input_items_sizes[i];
this->input_buff_offsets[i] += bytes;
- tsbe::Buffer &buff = task_iface.get_input_buffer(i);
+ tsbe::Buffer &buff = this->input_queues[i].front();
if (buff.get_length() >= this->input_buff_offsets[i])
{
- task_iface.pop_input_buffer(i);
+ this->input_queues[i].pop();
+ this->inputs_ready.set(i, not this->input_queues[i].empty());
this->input_buff_offsets[i] = 0;
}
}
@@ -170,10 +173,11 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->items_produced[i] += items;
const size_t bytes = items*this->output_items_sizes[i];
- tsbe::Buffer &buff = task_iface.get_output_buffer(i);
+ tsbe::Buffer &buff = this->output_queues[i].front();
buff.get_length() = bytes;
task_iface.post_downstream(i, buff);
- task_iface.pop_output_buffer(i);
+ this->output_queues[i].pop();
+ this->outputs_ready.set(i, not this->output_queues[i].empty());
}
//0) figure out what we have for input data