diff options
author | Josh Blum | 2012-08-29 23:15:21 -0700 |
---|---|---|
committer | Josh Blum | 2012-08-29 23:15:21 -0700 |
commit | 36f216977ff79a72b3c5498162659050bc7552ad (patch) | |
tree | 78d13d782f86d895138aefa690b0f056093ece72 /lib/block_task.cpp | |
parent | 483a44a31f6e02ae0cadfc41710f052d9e48fc6c (diff) | |
download | sandhi-36f216977ff79a72b3c5498162659050bc7552ad.tar.gz sandhi-36f216977ff79a72b3c5498162659050bc7552ad.tar.bz2 sandhi-36f216977ff79a72b3c5498162659050bc7552ad.zip |
using port messages and implement buffer queues in house
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r-- | lib/block_task.cpp | 24 |
1 files changed, 14 insertions, 10 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 577fdb7..d871f12 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -24,10 +24,11 @@ void ElementImpl::free_inputs(const tsbe::TaskInterface &task_iface) { for (size_t i = 0; i < task_iface.get_num_inputs(); i++) { - while (task_iface.get_input_buffer(i)) + while (not this->input_queues[i].empty()) { - task_iface.pop_input_buffer(i); + this->input_queues[i].pop(); } + this->inputs_ready.set(i, false); } } @@ -38,6 +39,7 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface) this->token_pool.clear(); this->token.reset(); this->free_inputs(task_iface); + this->output_buffer_tokens.clear(); for (size_t i = 0; i < task_iface.get_num_inputs(); i++) { task_iface.post_upstream(i, CheckTokensMessage()); @@ -63,8 +65,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //-- Handle task may get called for incoming buffers, //-- however, not all ports may have available buffers. //------------------------------------------------------------------ - const bool all_inputs_ready = (~task_iface.get_inputs_ready()).none(); - const bool all_outputs_ready = (~task_iface.get_outputs_ready()).none(); + const bool all_inputs_ready = (~this->inputs_ready).none(); + const bool all_outputs_ready = (~this->outputs_ready).none(); if (not (this->active and all_inputs_ready and all_outputs_ready)) return; const size_t num_inputs = task_iface.get_num_inputs(); @@ -97,7 +99,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) ASSERT(this->input_history_items[i] == 0); - const tsbe::Buffer &buff = task_iface.get_input_buffer(i); + const tsbe::Buffer &buff = this->input_queues[i].front(); char *mem = ((char *)buff.get_memory()) + this->input_buff_offsets[i]; const size_t bytes = buff.get_length() - this->input_buff_offsets[i]; const size_t items = bytes/this->input_items_sizes[i]; @@ -115,7 +117,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) output_tokens_count += this->output_tokens[i].use_count(); //this->produce_items[i] = 0; - const tsbe::Buffer &buff = task_iface.get_output_buffer(i); + const tsbe::Buffer &buff = this->output_queues[i].front(); char *mem = ((char *)buff.get_memory()); const size_t bytes = buff.get_length(); const size_t items = bytes/this->output_items_sizes[i]; @@ -156,10 +158,11 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->items_consumed[i] += items; const size_t bytes = items*this->input_items_sizes[i]; this->input_buff_offsets[i] += bytes; - tsbe::Buffer &buff = task_iface.get_input_buffer(i); + tsbe::Buffer &buff = this->input_queues[i].front(); if (buff.get_length() >= this->input_buff_offsets[i]) { - task_iface.pop_input_buffer(i); + this->input_queues[i].pop(); + this->inputs_ready.set(i, not this->input_queues[i].empty()); this->input_buff_offsets[i] = 0; } } @@ -170,10 +173,11 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->items_produced[i] += items; const size_t bytes = items*this->output_items_sizes[i]; - tsbe::Buffer &buff = task_iface.get_output_buffer(i); + tsbe::Buffer &buff = this->output_queues[i].front(); buff.get_length() = bytes; task_iface.post_downstream(i, buff); - task_iface.pop_output_buffer(i); + this->output_queues[i].pop(); + this->outputs_ready.set(i, not this->output_queues[i].empty()); } //0) figure out what we have for input data |