diff options
authorJosh Blum2012-10-13 23:56:02 -0700
committerJosh Blum2012-10-13 23:56:02 -0700
commit7962eb546821ddd98f57fa4fb60a8192bf4e34df (patch)
parente9e370f16b96563583e75855e9890c5d2d86c624 (diff)
simplfied input queues, WIP, but working
7 files changed, 47 insertions, 99 deletions
diff --git a/TODO.txt b/TODO.txt
index 1c9fe17..ed6ccf5 100644
--- a/TODO.txt
+++ b/TODO.txt
@@ -10,6 +10,9 @@
** not 100% sure here with the input/output edge case sizes
** forecast + input buffer accumulate
+* smarter forecast
+** tell user number of inputs, user says output reqs
* add hooks to specify input reserve
** automatically calculate from output multiple and rel rate
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index c9b289b..ca42b45 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -68,7 +68,7 @@ void BlockActor::handle_top_token(
//tell the upstream about the input requirements
OutputHintMessage output_hints;
output_hints.history_bytes = this->input_configs[i].lookahead_items*this->input_items_sizes[i];
- output_hints.reserve_bytes = this->input_reserve_items[i];
+ output_hints.reserve_bytes = size_t(std::ceil(this->output_multiple_items/this->relative_rate));
output_hints.token = this->input_tokens[i];
this->post_upstream(i, output_hints);
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 96283d7..935f148 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -105,7 +105,7 @@ void BlockActor::handle_task(void)
bool potential_inline;
- const SBuffer buff = this->input_queues.front(i, this->enable_fixed_rate and num_outputs, potential_inline);
+ const SBuffer buff = this->input_queues.front(i, potential_inline);
void *mem = buff.get();
size_t items = buff.length/this->input_items_sizes[i];
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index b77861b..71fd96e 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -130,7 +130,6 @@ struct BlockActor : Apology::Worker
std::vector<size_t> output_items_sizes;
std::vector<InputPortConfig> input_configs;
std::vector<OutputPortConfig> output_configs;
- std::vector<size_t> input_reserve_items;
size_t output_multiple_items;
//keeps track of production
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp
index 3666330..2789f92 100644
--- a/lib/gras_impl/input_buffer_queues.hpp
+++ b/lib/gras_impl/input_buffer_queues.hpp
@@ -33,20 +33,17 @@ namespace gnuradio
struct InputBufferQueues
enum {MAX_QUEUE_SIZE = 128};
+ enum {MAX_AUX_BUFF_BYTES=(1<<16)};
- void init(
- const std::vector<size_t> &input_history_items,
- const std::vector<size_t> &input_reserve_items,
- const std::vector<size_t> &input_item_sizes
- );
+ void update_history_bytes(const size_t i, const size_t hist_bytes);
//! Call to get an input buffer for work
- SBuffer front(const size_t i, const bool conserve_history, bool &potential_GRAS_FORCE_INLINE);
+ SBuffer front(const size_t i, bool &potential_GRAS_FORCE_INLINE);
//! Call when input bytes consumed by work
void consume(const size_t i, const size_t bytes_consumed);
@@ -98,17 +95,14 @@ struct InputBufferQueues
GRAS_FORCE_INLINE void __update(const size_t i)
- _bitset.set(i, _enqueued_bytes[i] >= _reserve_bytes[i]);
+ _bitset.set(i, _enqueued_bytes[i] != 0);
BitSet _bitset;
std::vector<size_t> _enqueued_bytes;
std::vector<boost::circular_buffer<SBuffer> > _queues;
std::vector<size_t> _history_bytes;
- std::vector<size_t> _reserve_bytes;
- std::vector<size_t> _post_bytes;
std::vector<boost::shared_ptr<BufferQueue> > _aux_queues;
- std::vector<bool> _in_aux_buff;
@@ -118,93 +112,56 @@ GRAS_FORCE_INLINE void InputBufferQueues::resize(const size_t size)
_enqueued_bytes.resize(size, 0);
_queues.resize(size, boost::circular_buffer<SBuffer>(MAX_QUEUE_SIZE));
_history_bytes.resize(size, 0);
- _reserve_bytes.resize(size, 0);
- _post_bytes.resize(size, 0);
- _in_aux_buff.resize(size, false);
-GRAS_FORCE_INLINE void InputBufferQueues::init(
- const std::vector<size_t> &input_history_items,
- const std::vector<size_t> &input_reserve_items,
- const std::vector<size_t> &input_item_sizes
- if (this->size() == 0) return;
- const size_t max_history_items = *std::max_element(input_history_items.begin(), input_history_items.end());
for (size_t i = 0; i < this->size(); i++)
- ASSERT(input_reserve_items[i] > 0);
+ if (_aux_queues[i]) continue;
_aux_queues[i] = boost::shared_ptr<BufferQueue>(new BufferQueue());
+ _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
+ _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
+ }
- //determine byte sizes for buffers and dealing with history
- const size_t old_history = _history_bytes[i];
- _history_bytes[i] = input_item_sizes[i]*input_history_items[i];
- //calculate the input reserve aka reserve size
- _reserve_bytes[i] = input_item_sizes[i]*input_reserve_items[i];
- _reserve_bytes[i] = std::max(size_t(1), _reserve_bytes[i]);
- //calculate the input reserve aka reserve size
- _reserve_bytes[i] = std::max(
- _history_bytes[i] + _reserve_bytes[i],
- _reserve_bytes[i]
- );
- _reserve_bytes[i] = 1;
- //post bytes are the desired buffer size to escape the edge case
- _post_bytes[i] = std::max(
- input_item_sizes[i]*max_history_items + _reserve_bytes[i],
- _reserve_bytes[i]
- );
- //allocate mini buffers for history edge conditions
- size_t num_bytes = _post_bytes[i];
- _aux_queues[i]->allocate_one(num_bytes);
- _aux_queues[i]->allocate_one(num_bytes);
- //there is history, so enqueue some initial history
- if (_history_bytes[i] > old_history)
- {
- SBuffer buff = _aux_queues[i]->front();
- _aux_queues[i]->pop();
- const size_t delta = _history_bytes[i] - old_history;
- std::memset(buff.get_actual_memory(), 0, delta);
- buff.offset = 0;
- buff.length = delta;
+inline void InputBufferQueues::update_history_bytes(const size_t i, const size_t hist_bytes)
+ //there is history, so enqueue some initial history
+ if (hist_bytes > _history_bytes[i])
+ {
+ SBuffer buff = _aux_queues[i]->front();
+ _aux_queues[i]->pop();
- this->push(i, buff);
- _in_aux_buff[i] = true;
- }
- if (_history_bytes[i] < old_history)
- {
- size_t delta = old_history - _history_bytes[i];
- delta = std::min(delta, _enqueued_bytes[i]); //FIXME
- //TODO consume extra delta on push...? so we dont need std::min
- this->consume(i, delta);
- }
+ const size_t delta = hist_bytes - _history_bytes[i];
+ std::memset(buff.get_actual_memory(), 0, delta);
+ buff.offset = 0;
+ buff.length = delta;
+ this->push(i, buff);
+ }
+ if (hist_bytes < _history_bytes[i])
+ {
+ size_t delta = _history_bytes[i] - hist_bytes;
+ delta = std::min(delta, _enqueued_bytes[i]); //FIXME
+ //TODO consume extra delta on push...? so we dont need std::min
+ this->consume(i, delta);
+ _history_bytes[i] = hist_bytes;
-GRAS_FORCE_INLINE SBuffer InputBufferQueues::front(const size_t i, const bool conserve_history, bool &potential_inline)
+GRAS_FORCE_INLINE SBuffer InputBufferQueues::front(const size_t i, bool &potential_inline)
//if (_queues[i].empty()) return BuffInfo();
ASSERT(not _queues[i].empty());
- ASSERT(_queues[i].front().length >= _history_bytes[i]);
SBuffer &front = _queues[i].front();
const bool unique = front.unique();
//same buffer, different offset and length
SBuffer buff = front;
- //if (conserve_history) ASSERT(buff.length >= _history_bytes[i]);
- //if (conserve_history) buff.length -= _history_bytes[i];
//set the flag that this buffer *might* be inlined as an output buffer
potential_inline = unique and (buff.length == front.length);
@@ -234,7 +191,7 @@ GRAS_FORCE_INLINE void InputBufferQueues::__prepare(const size_t i)
+ /*
@@ -284,6 +241,7 @@ GRAS_FORCE_INLINE void InputBufferQueues::__prepare(const size_t i)
//destination buffer is the new front of the queue
+ */
@@ -298,12 +256,14 @@ GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t b
_queues[i].front().offset += bytes_consumed;
_queues[i].front().length -= bytes_consumed;
+ ASSERT(_queues[i].front().offset <= _queues[i].front().get_actual_length());
//safe to pop here when the buffer is consumed and no history
- if (_queues[i].front().length == 0 and _history_bytes[i] == 0)
+ //if (_queues[i].front().length == 0/* and _history_bytes[i] == 0*/)
- _queues[i].pop_front();
+ // _queues[i].pop_front();
else if (_in_aux_buff[i] and _queues[i].front().offset >= 2*_history_bytes[i])
const SBuffer buff = _queues[i].front();
@@ -321,18 +281,12 @@ GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t b
_queues[i].front().offset -= residual;
+ */
//update the number of bytes in this queue
ASSERT(_enqueued_bytes[i] >= bytes_consumed);
_enqueued_bytes[i] -= bytes_consumed;
- //we have consumed the history, change reqs
- if (_enqueued_bytes[i] < _history_bytes[i])
- {
- _history_bytes[i] = 0;
- _reserve_bytes[i] = 1; //cant be 0
- }
diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp
index c2abe73..30bb613 100644
--- a/lib/input_handlers.cpp
+++ b/lib/input_handlers.cpp
@@ -33,7 +33,6 @@ void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Th
const size_t index = message.index;
- VAR(this->input_tokens[index].use_count());
//handle incoming stream buffer, push into the queue
if (this->block_state == BLOCK_STATE_DONE) return;
diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp
index eba9bca..5ad9378 100644
--- a/lib/topology_handler.cpp
+++ b/lib/topology_handler.cpp
@@ -111,16 +111,9 @@ void BlockActor::handle_update_inputs(
//impose input reserve requirements based on relative rate and output multiple
- resize_fill_grow(this->input_reserve_items, num_inputs, 1);
- std::vector<size_t> input_lookahead_items(num_inputs);
for (size_t i = 0; i < num_inputs; i++)
- input_lookahead_items[i] = this->input_configs[i].lookahead_items;
- if (this->enable_fixed_rate)
- this->input_reserve_items[i] = size_t(std::ceil(this->output_multiple_items/this->relative_rate));
- if (this->input_reserve_items[i] == 0) this->input_reserve_items[i] = 1;
+ const size_t hist_bytes = this->input_items_sizes[i]*this->input_configs[i].lookahead_items;
+ this->input_queues.update_history_bytes(i, hist_bytes);
- //init the history comprehension on input queues
- this->input_queues.init(input_lookahead_items, this->input_reserve_items, this->input_items_sizes);