diff options
author | Josh Blum | 2012-09-02 14:23:28 -0700 |
---|---|---|
committer | Josh Blum | 2012-09-02 14:23:28 -0700 |
commit | 3522678e9ed21ba1e880b19762669fe2b0c44064 (patch) | |
tree | 796d177a494a8c4a0127eba88fdf802bf902722b /lib | |
parent | 0a988ab506d3489e13222e9c7ddff889c6371ee6 (diff) | |
download | sandhi-3522678e9ed21ba1e880b19762669fe2b0c44064.tar.gz sandhi-3522678e9ed21ba1e880b19762669fe2b0c44064.tar.bz2 sandhi-3522678e9ed21ba1e880b19762669fe2b0c44064.zip |
checking in TODO and minor changes
Diffstat (limited to 'lib')
-rw-r--r-- | lib/block_handlers.cpp | 12 | ||||
-rw-r--r-- | lib/block_task.cpp | 23 | ||||
-rw-r--r-- | lib/element_impl.hpp | 2 |
3 files changed, 28 insertions, 9 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index fcaa359..396416d 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -125,18 +125,28 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface) this->input_tags.resize(num_inputs); this->output_tags.resize(num_outputs); + //determine max history length for allocation below + this->max_history_items = *std::max_element( + this->input_history_items.begin(), + this->input_history_items.end() + ); + //resize and clear that initial history this->history_buffs.resize(num_inputs); + this->input_history_bytes.resize(num_inputs); for (size_t i = 0; i < num_inputs; i++) { tsbe::Buffer &buff = this->history_buffs[i]; - const size_t num_bytes = this->input_items_sizes[i]*this->input_history_items[i]; + const size_t num_items = this->input_history_items[i] + this->max_history_items; + const size_t num_bytes = this->input_items_sizes[i]*num_items; + this->input_history_bytes[i] = num_bytes; if (not buff or buff.get_length() != num_bytes) { tsbe::BufferConfig config; config.memory = NULL; config.length = num_bytes; buff = tsbe::Buffer(config); + std::memset(buff.get_memory(), 0, buff.get_length()); } } diff --git a/lib/block_task.cpp b/lib/block_task.cpp index e3efb91..a84ef6a 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -93,18 +93,16 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) for (size_t i = 0; i < num_inputs; i++) { input_tokens_count += this->input_tokens[i].use_count(); - //this->consume_items[i] = 0; ASSERT(this->input_history_items[i] == 0); - ASSERT(this->input_queues.ready(i)); + ASSERT(this->input_queues.ready(i)); const tsbe::Buffer &buff = this->input_queues.front(i); + ASSERT(this->input_buff_offsets[i] < buff.get_length()); char *mem = ((char *)buff.get_memory()) + this->input_buff_offsets[i]; const size_t bytes = buff.get_length() - this->input_buff_offsets[i]; const size_t items = bytes/this->input_items_sizes[i]; - ASSERT(this->input_buff_offsets[i] < buff.get_length()); - this->work_io_ptr_mask |= ptrdiff_t(mem); this->input_items[i]._mem = mem; this->input_items[i]._len = items; @@ -120,10 +118,10 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) for (size_t i = 0; i < num_outputs; i++) { output_tokens_count += this->output_tokens[i].use_count(); - //this->produce_items[i] = 0; - ASSERT(this->output_queues.ready(i)); + ASSERT(this->output_multiple_items[i] == 1); + ASSERT(this->output_queues.ready(i)); const tsbe::Buffer &buff = this->output_queues.front(i); char *mem = ((char *)buff.get_memory()); const size_t bytes = buff.get_length(); @@ -137,13 +135,22 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) } //if we have outputs and at least one port has no downstream subscibers, mark done - if ((num_outputs != 0 and output_tokens_count == num_outputs)){ + if ((num_outputs != 0 and output_tokens_count == num_outputs)) + { this->mark_done(task_iface); return; } //------------------------------------------------------------------ - //-- forecast (TODO) and work + //-- forecast + //------------------------------------------------------------------ + if (not this->enable_fixed_rate) + { + block_ptr->forecast(num_output_items, work_ninput_items); + } + + //------------------------------------------------------------------ + //-- the work //------------------------------------------------------------------ const int ret = block_ptr->Work(this->input_items, this->output_items); const size_t noutput_items = size_t(ret); diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index 333bfae..6d49033 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -74,7 +74,9 @@ struct ElementImpl std::vector<size_t> input_buff_offsets; //special buffer for dealing with history + std::vector<size_t> input_history_bytes; std::vector<tsbe::Buffer> history_buffs; + size_t max_history_items; //track the subscriber counts std::vector<Token> input_tokens; |