diff options
-rw-r--r-- | lib/block_allocator.cpp | 2 | ||||
-rw-r--r-- | lib/block_task.cpp | 10 | ||||
-rw-r--r-- | lib/element.cpp | 2 | ||||
-rw-r--r-- | lib/gras_impl/input_buffer_queues.hpp | 18 |
4 files changed, 17 insertions, 15 deletions
diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp index 516729e..f98a0bd 100644 --- a/lib/block_allocator.cpp +++ b/lib/block_allocator.cpp @@ -38,7 +38,7 @@ void ElementImpl::handle_allocation(const tsbe::TaskInterface &task_iface) for (size_t i = 0; i < num_outputs; i++) { size_t items = this->hint; - if (items == 0) items = 1024; + if (items == 0) items = 2048; items = std::max(items, this->output_multiple_items[i]); const size_t bytes = items * this->output_items_sizes[i]; diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 0dcfca4..a649590 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -82,7 +82,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->input_queues.all_ready() and this->output_queues.all_ready() )) return; - //std::cout << "calling work on " << name << std::endl; + //std::cout << "=== calling work on " << name << " ===" << std::endl; const size_t num_inputs = task_iface.get_num_inputs(); const size_t num_outputs = task_iface.get_num_outputs(); @@ -130,8 +130,6 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) { output_tokens_count += this->output_tokens[i].use_count(); - ASSERT(this->output_multiple_items[i] == 1); - ASSERT(this->output_queues.ready(i)); const tsbe::Buffer &buff = this->output_queues.front(i); char *mem = ((char *)buff.get_memory()) + this->output_bytes_offset[i]; @@ -176,7 +174,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //------------------------------------------------------------------ //-- process input consumption //------------------------------------------------------------------ - bool input_fully_consumed = true; + bool input_allows_flush = true; for (size_t i = 0; i < num_inputs; i++) { ASSERT(enable_fixed_rate or ret != Block::WORK_CALLED_PRODUCE); @@ -185,7 +183,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->items_consumed[i] += items; const size_t bytes = items*this->input_items_sizes[i]; - input_fully_consumed = input_fully_consumed and this->input_queues.pop(i, bytes); + input_allows_flush = input_allows_flush and this->input_queues.pop(i, bytes); } //------------------------------------------------------------------ @@ -203,7 +201,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //only pass output buffer downstream when the input is fully consumed... //Reasoning: For the sake of dealling with history, we can process the mini history input buffer, //and then call work again on the real input buffer, but still yield one output buffer per input buffer. - if (input_fully_consumed) + if (input_allows_flush) { tsbe::Buffer &buff = this->output_queues.front(i); buff.get_length() = this->output_bytes_offset[i]; diff --git a/lib/element.cpp b/lib/element.cpp index 34cfcf4..db0bbac 100644 --- a/lib/element.cpp +++ b/lib/element.cpp @@ -30,7 +30,7 @@ Element::Element(void) Element::Element(const std::string &name) { this->reset(new ElementImpl()); - //VAR(name); + VAR(name); (*this)->name = name; (*this)->unique_id = ++unique_id_pool; diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp index 3075348..563e5d0 100644 --- a/lib/gras_impl/input_buffer_queues.hpp +++ b/lib/gras_impl/input_buffer_queues.hpp @@ -100,14 +100,13 @@ struct InputBufferQueues } info.mem = (char *)hist_buff.get_memory() + _offset_bytes[i]; - info.len = hist_buff.get_length() - _offset_bytes[i]; + info.len = hist_buff.get_length() - _offset_bytes[i] - _history_bytes[i]; } else { - const size_t delta_bytes = _offset_bytes[i] - _history_bytes[i]; - info.mem = ((char *)buff.get_memory()) + delta_bytes; - info.len = buff.get_length() - delta_bytes; + info.mem = ((char *)buff.get_memory()) + _offset_bytes[i] - _history_bytes[i]; + info.len = buff.get_length() - _offset_bytes[i]; } return info; @@ -121,17 +120,22 @@ struct InputBufferQueues * If so, pop the input buffer, copy the tail end of the buffer * into the mini history buffer, and reset the offset condition. * - * \return true if the buffer is fully consumed + * \return true if the input allows output flushing */ inline bool pop(const size_t i, const size_t bytes_consumed) { - _offset_bytes[i] += bytes_consumed + _history_bytes[i]; + _offset_bytes[i] += bytes_consumed; //if totally consumed, memcpy history and pop const tsbe::Buffer &buff = _queues[i].front(); if (_offset_bytes[i] >= buff.get_length()) { ASSERT(_offset_bytes[i] == buff.get_length()); //bad to consume more than buffer allows + /*if (_offset_bytes[i] != buff.get_length()) + { + VAR(_offset_bytes[i]); + VAR(buff.get_length()); + }*/ if (_history_bytes[i] != 0) { char *src_mem = ((char *)buff.get_memory()) + _offset_bytes[i] - _history_bytes[i]; @@ -142,7 +146,7 @@ struct InputBufferQueues _offset_bytes[i] = 0; return true; } - return false; + return _history_bytes[i] == 0; } inline void resize(const size_t size) |