diff options
author | Josh Blum | 2012-10-13 23:56:02 -0700 |
---|---|---|
committer | Josh Blum | 2012-10-13 23:56:02 -0700 |
commit | 7962eb546821ddd98f57fa4fb60a8192bf4e34df (patch) | |
tree | b797669bfbdf92a5df31bdd92d8e06befa66b74f | |
parent | e9e370f16b96563583e75855e9890c5d2d86c624 (diff) | |
download | sandhi-7962eb546821ddd98f57fa4fb60a8192bf4e34df.tar.gz sandhi-7962eb546821ddd98f57fa4fb60a8192bf4e34df.tar.bz2 sandhi-7962eb546821ddd98f57fa4fb60a8192bf4e34df.zip |
simplfied input queues, WIP, but working
-rw-r--r-- | TODO.txt | 3 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 2 | ||||
-rw-r--r-- | lib/block_task.cpp | 2 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 1 | ||||
-rw-r--r-- | lib/gras_impl/input_buffer_queues.hpp | 126 | ||||
-rw-r--r-- | lib/input_handlers.cpp | 1 | ||||
-rw-r--r-- | lib/topology_handler.cpp | 11 |
7 files changed, 47 insertions, 99 deletions
@@ -10,6 +10,9 @@ ** not 100% sure here with the input/output edge case sizes ** forecast + input buffer accumulate +* smarter forecast +** tell user number of inputs, user says output reqs + * add hooks to specify input reserve ** automatically calculate from output multiple and rel rate diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index c9b289b..ca42b45 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -68,7 +68,7 @@ void BlockActor::handle_top_token( //tell the upstream about the input requirements OutputHintMessage output_hints; output_hints.history_bytes = this->input_configs[i].lookahead_items*this->input_items_sizes[i]; - output_hints.reserve_bytes = this->input_reserve_items[i]; + output_hints.reserve_bytes = size_t(std::ceil(this->output_multiple_items/this->relative_rate)); output_hints.token = this->input_tokens[i]; this->post_upstream(i, output_hints); diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 96283d7..935f148 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -105,7 +105,7 @@ void BlockActor::handle_task(void) ASSERT(this->input_queues.ready(i)); bool potential_inline; - const SBuffer buff = this->input_queues.front(i, this->enable_fixed_rate and num_outputs, potential_inline); + const SBuffer buff = this->input_queues.front(i, potential_inline); void *mem = buff.get(); size_t items = buff.length/this->input_items_sizes[i]; diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index b77861b..71fd96e 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -130,7 +130,6 @@ struct BlockActor : Apology::Worker std::vector<size_t> output_items_sizes; std::vector<InputPortConfig> input_configs; std::vector<OutputPortConfig> output_configs; - std::vector<size_t> input_reserve_items; size_t output_multiple_items; //keeps track of production diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp index 3666330..2789f92 100644 --- a/lib/gras_impl/input_buffer_queues.hpp +++ b/lib/gras_impl/input_buffer_queues.hpp @@ -33,20 +33,17 @@ namespace gnuradio struct InputBufferQueues { enum {MAX_QUEUE_SIZE = 128}; + enum {MAX_AUX_BUFF_BYTES=(1<<16)}; ~InputBufferQueues(void) { this->resize(0); } - void init( - const std::vector<size_t> &input_history_items, - const std::vector<size_t> &input_reserve_items, - const std::vector<size_t> &input_item_sizes - ); + void update_history_bytes(const size_t i, const size_t hist_bytes); //! Call to get an input buffer for work - SBuffer front(const size_t i, const bool conserve_history, bool &potential_GRAS_FORCE_INLINE); + SBuffer front(const size_t i, bool &potential_GRAS_FORCE_INLINE); //! Call when input bytes consumed by work void consume(const size_t i, const size_t bytes_consumed); @@ -98,17 +95,14 @@ struct InputBufferQueues GRAS_FORCE_INLINE void __update(const size_t i) { - _bitset.set(i, _enqueued_bytes[i] >= _reserve_bytes[i]); + _bitset.set(i, _enqueued_bytes[i] != 0); } BitSet _bitset; std::vector<size_t> _enqueued_bytes; std::vector<boost::circular_buffer<SBuffer> > _queues; std::vector<size_t> _history_bytes; - std::vector<size_t> _reserve_bytes; - std::vector<size_t> _post_bytes; std::vector<boost::shared_ptr<BufferQueue> > _aux_queues; - std::vector<bool> _in_aux_buff; }; @@ -118,93 +112,56 @@ GRAS_FORCE_INLINE void InputBufferQueues::resize(const size_t size) _enqueued_bytes.resize(size, 0); _queues.resize(size, boost::circular_buffer<SBuffer>(MAX_QUEUE_SIZE)); _history_bytes.resize(size, 0); - _reserve_bytes.resize(size, 0); - _post_bytes.resize(size, 0); _aux_queues.resize(size); - _in_aux_buff.resize(size, false); -} - -GRAS_FORCE_INLINE void InputBufferQueues::init( - const std::vector<size_t> &input_history_items, - const std::vector<size_t> &input_reserve_items, - const std::vector<size_t> &input_item_sizes -){ - if (this->size() == 0) return; - - const size_t max_history_items = *std::max_element(input_history_items.begin(), input_history_items.end()); for (size_t i = 0; i < this->size(); i++) { - ASSERT(input_reserve_items[i] > 0); - + if (_aux_queues[i]) continue; _aux_queues[i] = boost::shared_ptr<BufferQueue>(new BufferQueue()); + _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); + _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); + } - //determine byte sizes for buffers and dealing with history - const size_t old_history = _history_bytes[i]; - _history_bytes[i] = input_item_sizes[i]*input_history_items[i]; - - //calculate the input reserve aka reserve size - _reserve_bytes[i] = input_item_sizes[i]*input_reserve_items[i]; - _reserve_bytes[i] = std::max(size_t(1), _reserve_bytes[i]); - - //calculate the input reserve aka reserve size - _reserve_bytes[i] = std::max( - _history_bytes[i] + _reserve_bytes[i], - _reserve_bytes[i] - ); - _reserve_bytes[i] = 1; - - //post bytes are the desired buffer size to escape the edge case - _post_bytes[i] = std::max( - input_item_sizes[i]*max_history_items + _reserve_bytes[i], - _reserve_bytes[i] - ); - - //allocate mini buffers for history edge conditions - size_t num_bytes = _post_bytes[i]; - _aux_queues[i]->allocate_one(num_bytes); - _aux_queues[i]->allocate_one(num_bytes); - - //there is history, so enqueue some initial history - if (_history_bytes[i] > old_history) - { - SBuffer buff = _aux_queues[i]->front(); - _aux_queues[i]->pop(); +} - const size_t delta = _history_bytes[i] - old_history; - std::memset(buff.get_actual_memory(), 0, delta); - buff.offset = 0; - buff.length = delta; +inline void InputBufferQueues::update_history_bytes(const size_t i, const size_t hist_bytes) +{ + //there is history, so enqueue some initial history + if (hist_bytes > _history_bytes[i]) + { + SBuffer buff = _aux_queues[i]->front(); + _aux_queues[i]->pop(); - this->push(i, buff); - _in_aux_buff[i] = true; - } - if (_history_bytes[i] < old_history) - { - size_t delta = old_history - _history_bytes[i]; - delta = std::min(delta, _enqueued_bytes[i]); //FIXME - //TODO consume extra delta on push...? so we dont need std::min - this->consume(i, delta); - } + const size_t delta = hist_bytes - _history_bytes[i]; + std::memset(buff.get_actual_memory(), 0, delta); + buff.offset = 0; + buff.length = delta; + + this->push(i, buff); + } + if (hist_bytes < _history_bytes[i]) + { + size_t delta = _history_bytes[i] - hist_bytes; + delta = std::min(delta, _enqueued_bytes[i]); //FIXME + //TODO consume extra delta on push...? so we dont need std::min + this->consume(i, delta); } -} + _history_bytes[i] = hist_bytes; +} -GRAS_FORCE_INLINE SBuffer InputBufferQueues::front(const size_t i, const bool conserve_history, bool &potential_inline) +GRAS_FORCE_INLINE SBuffer InputBufferQueues::front(const size_t i, bool &potential_inline) { //if (_queues[i].empty()) return BuffInfo(); ASSERT(not _queues[i].empty()); ASSERT(this->ready(i)); __prepare(i); - ASSERT(_queues[i].front().length >= _history_bytes[i]); SBuffer &front = _queues[i].front(); const bool unique = front.unique(); //same buffer, different offset and length SBuffer buff = front; - //if (conserve_history) ASSERT(buff.length >= _history_bytes[i]); - //if (conserve_history) buff.length -= _history_bytes[i]; //set the flag that this buffer *might* be inlined as an output buffer potential_inline = unique and (buff.length == front.length); @@ -234,7 +191,7 @@ GRAS_FORCE_INLINE void InputBufferQueues::__prepare(const size_t i) - + /* @@ -284,6 +241,7 @@ GRAS_FORCE_INLINE void InputBufferQueues::__prepare(const size_t i) //destination buffer is the new front of the queue _queues[i].push_front(dst); } + */ } @@ -298,12 +256,14 @@ GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t b _queues[i].front().offset += bytes_consumed; _queues[i].front().length -= bytes_consumed; + ASSERT(_queues[i].front().offset <= _queues[i].front().get_actual_length()); + //safe to pop here when the buffer is consumed and no history - if (_queues[i].front().length == 0 and _history_bytes[i] == 0) + //if (_queues[i].front().length == 0/* and _history_bytes[i] == 0*/) { - _queues[i].pop_front(); + // _queues[i].pop_front(); } - +/* else if (_in_aux_buff[i] and _queues[i].front().offset >= 2*_history_bytes[i]) { const SBuffer buff = _queues[i].front(); @@ -321,18 +281,12 @@ GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t b _queues[i].front().offset -= residual; } } + */ //update the number of bytes in this queue ASSERT(_enqueued_bytes[i] >= bytes_consumed); _enqueued_bytes[i] -= bytes_consumed; - //we have consumed the history, change reqs - if (_enqueued_bytes[i] < _history_bytes[i]) - { - _history_bytes[i] = 0; - _reserve_bytes[i] = 1; //cant be 0 - } - __update(i); } diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp index c2abe73..30bb613 100644 --- a/lib/input_handlers.cpp +++ b/lib/input_handlers.cpp @@ -33,7 +33,6 @@ void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Th { MESSAGE_TRACER(); const size_t index = message.index; - VAR(this->input_tokens[index].use_count()); //handle incoming stream buffer, push into the queue if (this->block_state == BLOCK_STATE_DONE) return; diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp index eba9bca..5ad9378 100644 --- a/lib/topology_handler.cpp +++ b/lib/topology_handler.cpp @@ -111,16 +111,9 @@ void BlockActor::handle_update_inputs( this->input_queues.resize(num_inputs); //impose input reserve requirements based on relative rate and output multiple - resize_fill_grow(this->input_reserve_items, num_inputs, 1); - std::vector<size_t> input_lookahead_items(num_inputs); for (size_t i = 0; i < num_inputs; i++) { - input_lookahead_items[i] = this->input_configs[i].lookahead_items; - if (this->enable_fixed_rate) - this->input_reserve_items[i] = size_t(std::ceil(this->output_multiple_items/this->relative_rate)); - if (this->input_reserve_items[i] == 0) this->input_reserve_items[i] = 1; + const size_t hist_bytes = this->input_items_sizes[i]*this->input_configs[i].lookahead_items; + this->input_queues.update_history_bytes(i, hist_bytes); } - - //init the history comprehension on input queues - this->input_queues.init(input_lookahead_items, this->input_reserve_items, this->input_items_sizes); } |