diff options
author | Josh Blum | 2012-10-13 13:22:18 -0700 |
---|---|---|
committer | Josh Blum | 2012-10-13 13:22:18 -0700 |
commit | 8354c9c4c92687904cedae0b0feb702c2d7e8408 (patch) | |
tree | 4c063baf0dfa3a06a6f090b2346b593babaa2efb | |
parent | f9c854d3cdd0a9337b41842d15a13d6df4ece408 (diff) | |
download | sandhi-8354c9c4c92687904cedae0b0feb702c2d7e8408.tar.gz sandhi-8354c9c4c92687904cedae0b0feb702c2d7e8408.tar.bz2 sandhi-8354c9c4c92687904cedae0b0feb702c2d7e8408.zip |
lots of mini tweaks for QA passing, WIP...
-rw-r--r-- | lib/block_allocator.cpp | 6 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 16 | ||||
-rw-r--r-- | lib/block_task.cpp | 14 | ||||
-rw-r--r-- | lib/gr_block.cpp | 3 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 2 | ||||
-rw-r--r-- | lib/gras_impl/input_buffer_queues.hpp | 4 | ||||
-rw-r--r-- | lib/gras_impl/output_buffer_queues.hpp | 4 | ||||
-rw-r--r-- | lib/input_handlers.cpp | 1 | ||||
-rw-r--r-- | lib/top_block.cpp | 2 | ||||
-rw-r--r-- | lib/topology_handler.cpp | 3 |
10 files changed, 23 insertions, 32 deletions
diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp index 9b70824..05b35ac 100644 --- a/lib/block_allocator.cpp +++ b/lib/block_allocator.cpp @@ -78,12 +78,6 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address { MESSAGE_TRACER(); - if (this->block_state == BLOCK_STATE_DONE) - { - this->Send(0, from); //ACK - return; - } - //allocate output buffers which will also wake up the task const size_t num_outputs = this->get_num_outputs(); this->output_buffer_tokens.resize(num_outputs); diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 2c14643..c9b289b 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -27,12 +27,6 @@ void BlockActor::handle_top_active( ){ MESSAGE_TRACER(); - if (this->block_state == BLOCK_STATE_DONE) - { - this->Send(0, from); //ACK - return; - } - if (this->block_state != BLOCK_STATE_LIVE) { this->block_ptr->start(); @@ -50,10 +44,6 @@ void BlockActor::handle_top_inert( ){ MESSAGE_TRACER(); - if (this->block_state != BLOCK_STATE_DONE) - { - this->block_ptr->stop(); - } this->mark_done(); this->Send(0, from); //ACK @@ -65,12 +55,6 @@ void BlockActor::handle_top_token( ){ MESSAGE_TRACER(); - if (this->block_state == BLOCK_STATE_DONE) - { - this->Send(0, from); //ACK - return; - } - //create input tokens and send allocation hints for (size_t i = 0; i < this->get_num_inputs(); i++) { diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 66669d1..1570064 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -25,6 +25,8 @@ void BlockActor::mark_done(void) { if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first + this->block_ptr->stop(); + //flush partial output buffers to the downstream for (size_t i = 0; i < this->get_num_outputs(); i++) { @@ -103,15 +105,19 @@ void BlockActor::handle_task(void) ASSERT(this->input_queues.ready(i)); bool potential_inline; - const SBuffer buff = this->input_queues.front(i, this->enable_fixed_rate, potential_inline); + const SBuffer buff = this->input_queues.front(i, this->enable_fixed_rate and num_outputs, potential_inline); void *mem = buff.get(); - const size_t items = buff.length/this->input_items_sizes[i]; + size_t items = buff.length/this->input_items_sizes[i]; this->work_io_ptr_mask |= ptrdiff_t(mem); this->input_items[i].get() = mem; this->input_items[i].size() = items; this->work_input_items[i] = mem; this->work_ninput_items[i] = items; + if (this->enable_fixed_rate) + { + items = std::max<int>(0, int(items) - int(this->input_configs[i].lookahead_items)); + } num_input_items = std::min(num_input_items, items); this->consume_items[i] = 0; this->consume_called[i] = false; @@ -184,11 +190,12 @@ void BlockActor::handle_task(void) { if (fcast_ninput_items[i] <= work_ninput_items[i]) continue; + const size_t work_noutput_items_last = work_noutput_items; work_noutput_items = work_noutput_items/2; //backoff regime work_noutput_items += this->output_multiple_items-1; work_noutput_items /= this->output_multiple_items; work_noutput_items *= this->output_multiple_items; - if (work_noutput_items) goto forecast_again_you_jerk; + if (work_noutput_items and work_noutput_items_last != work_noutput_items) goto forecast_again_you_jerk; //handle the case of forecast failing //TODO accumulate input here, only done if inputs done and already accumulated @@ -210,6 +217,7 @@ void BlockActor::handle_task(void) this->task_work(); } const size_t noutput_items = size_t(work_ret); + VAR(work_ret); if (work_ret == Block::WORK_DONE) { diff --git a/lib/gr_block.cpp b/lib/gr_block.cpp index 834bbd5..a384af1 100644 --- a/lib/gr_block.cpp +++ b/lib/gr_block.cpp @@ -80,7 +80,7 @@ bool gr_block::is_unaligned(void) size_t gr_block::fixed_rate_noutput_to_ninput(const size_t noutput_items) { - return size_t((noutput_items/this->relative_rate())); + return size_t(0.5 + (noutput_items/this->relative_rate())) + this->history() - 1; } size_t gr_block::interpolation(void) const @@ -91,6 +91,7 @@ size_t gr_block::interpolation(void) const void gr_block::set_interpolation(const size_t interp) { this->set_relative_rate(1.0*interp); + this->set_output_multiple(interp); } size_t gr_block::decimation(void) const diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 5dd7421..b77861b 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -114,7 +114,7 @@ struct BlockActor : Apology::Worker void trim_tags(const size_t index); GRAS_FORCE_INLINE bool any_inputs_done(void) { - if (this->inputs_done.none()) return false; + if (this->inputs_done.none() or this->input_queues.all_ready()) return false; for (size_t i = 0; i < this->get_num_inputs(); i++) { if (this->inputs_done[i] and not this->input_queues.ready(i)) diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp index a3ca1dc..3666330 100644 --- a/lib/gras_impl/input_buffer_queues.hpp +++ b/lib/gras_impl/input_buffer_queues.hpp @@ -152,6 +152,7 @@ GRAS_FORCE_INLINE void InputBufferQueues::init( _history_bytes[i] + _reserve_bytes[i], _reserve_bytes[i] ); + _reserve_bytes[i] = 1; //post bytes are the desired buffer size to escape the edge case _post_bytes[i] = std::max( @@ -202,7 +203,8 @@ GRAS_FORCE_INLINE SBuffer InputBufferQueues::front(const size_t i, const bool co //same buffer, different offset and length SBuffer buff = front; - if (conserve_history) buff.length -= _history_bytes[i]; + //if (conserve_history) ASSERT(buff.length >= _history_bytes[i]); + //if (conserve_history) buff.length -= _history_bytes[i]; //set the flag that this buffer *might* be inlined as an output buffer potential_inline = unique and (buff.length == front.length); diff --git a/lib/gras_impl/output_buffer_queues.hpp b/lib/gras_impl/output_buffer_queues.hpp index a740b2d..1f07af0 100644 --- a/lib/gras_impl/output_buffer_queues.hpp +++ b/lib/gras_impl/output_buffer_queues.hpp @@ -76,9 +76,7 @@ struct OutputBufferQueues GRAS_FORCE_INLINE void flush_all(void) { - _queues.clear(); - _queues.resize(_bitset.size()); - _bitset.reset(); + for (size_t i = 0; i < this->size(); i++) this->flush(i); } GRAS_FORCE_INLINE bool ready(const size_t i) const diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp index 30bb613..c2abe73 100644 --- a/lib/input_handlers.cpp +++ b/lib/input_handlers.cpp @@ -33,6 +33,7 @@ void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Th { MESSAGE_TRACER(); const size_t index = message.index; + VAR(this->input_tokens[index].use_count()); //handle incoming stream buffer, push into the queue if (this->block_state == BLOCK_STATE_DONE) return; diff --git a/lib/top_block.cpp b/lib/top_block.cpp index bcce1bf..d71a4a7 100644 --- a/lib/top_block.cpp +++ b/lib/top_block.cpp @@ -47,6 +47,8 @@ TopBlock::TopBlock(const std::string &name): void ElementImpl::top_block_cleanup(void) { this->executor->post_all(TopInertMessage()); + this->topology->clear_all();; + this->executor->commit(); if (ARMAGEDDON) std::cerr << "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n" << "xx Top Block Destroyed: " << name << "\n" diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp index cc57ed6..ae9f040 100644 --- a/lib/topology_handler.cpp +++ b/lib/topology_handler.cpp @@ -114,7 +114,8 @@ void BlockActor::handle_update_inputs( for (size_t i = 0; i < num_inputs; i++) { input_lookahead_items[i] = this->input_configs[i].lookahead_items; - this->input_reserve_items[i] = size_t(std::ceil(this->output_multiple_items/this->relative_rate)); + if (this->enable_fixed_rate) + this->input_reserve_items[i] = size_t(std::ceil(this->output_multiple_items/this->relative_rate)); if (this->input_reserve_items[i] == 0) this->input_reserve_items[i] = 1; } |